This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 56dc7f8c2199 [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
56dc7f8c2199 is described below
commit 56dc7f8c2199bd3f2822004a1d9853188a9db465
Author: Huaxin Gao <[email protected]>
AuthorDate: Thu Dec 21 10:13:55 2023 +0800
[SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
### What changes were proposed in this pull request?
Add `MergeInto` support in `DataFrameWriterV2`
### Why are the changes needed?
Spark currently supports merge into sql statement. We want DataFrame to
have the same support.
### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces new API like the following:
```
spark.table("source")
.mergeInto("target", $"source.id" === $"target.id")
.whenNotMatched()
.insertAll()
.merge()
```
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44119 from huaxingao/mergeinto.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../src/main/resources/error/error-classes.json | 6 +
.../CheckConnectJvmClientCompatibility.scala | 11 +-
docs/sql-error-conditions.md | 6 +
.../main/scala/org/apache/spark/sql/Dataset.scala | 32 +
.../org/apache/spark/sql/MergeIntoWriter.scala | 329 +++++++
.../sql/connector/MergeIntoDataFrameSuite.scala | 946 +++++++++++++++++++++
6 files changed, 1329 insertions(+), 1 deletion(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 930042505379..df223f3298ef 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2846,6 +2846,12 @@
],
"sqlState" : "42000"
},
+ "NO_MERGE_ACTION_SPECIFIED" : {
+ "message" : [
+ "df.mergeInto needs to be followed by at least one of
whenMatched/whenNotMatched/whenNotMatchedBySource."
+ ],
+ "sqlState" : "42K0E"
+ },
"NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
"message" : [
"Cannot find <catalystFieldPath> in Protobuf schema."
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a9b6f102a512..bd5ff6af7464 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -298,7 +298,16 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.util.ArtifactUtils"),
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.artifact.util.ArtifactUtils$"))
+ "org.apache.spark.sql.artifact.util.ArtifactUtils$"),
+
+ // MergeIntoWriter
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched$"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource"),
+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource$"))
checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
}
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 94c7c167e392..a1af6863913e 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1640,6 +1640,12 @@ Can't determine the default value for `<colName>` since
it is not nullable and i
No handler for UDAF '`<functionName>`'. Use sparkSession.udf.register(...)
instead.
+### NO_MERGE_ACTION_SPECIFIED
+
+[SQLSTATE:
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+df.mergeInto needs to be followed by at least one of
whenMatched/whenNotMatched/whenNotMatchedBySource.
+
### NO_SQL_TYPE_IN_PROTOBUF_SCHEMA
[SQLSTATE:
42S22](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2256f9db8c64..a3dc976647be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -4129,6 +4129,38 @@ class Dataset[T] private[sql](
new DataFrameWriterV2[T](table, this)
}
+ /**
+ * Merges a set of updates, insertions, and deletions based on a source
table into
+ * a target table.
+ *
+ * Scala Examples:
+ * {{{
+ * spark.table("source")
+ * .mergeInto("target", $"source.id" === $"target.id")
+ * .whenMatched($"salary" === 100)
+ * .delete()
+ * .whenNotMatched()
+ * .insertAll()
+ * .whenNotMatchedBySource($"salary" === 100)
+ * .update(Map(
+ * "salary" -> lit(200)
+ * ))
+ * .merge()
+ * }}}
+ *
+ * @group basic
+ * @since 4.0.0
+ */
+ def mergeInto(table: String, condition: Column): MergeIntoWriter[T] = {
+ if (isStreaming) {
+ logicalPlan.failAnalysis(
+ errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
+ messageParameters = Map("methodName" -> toSQLId("mergeInto")))
+ }
+
+ new MergeIntoWriter[T](table, this, condition)
+ }
+
/**
* Interface for saving the content of the streaming Dataset out into
external storage.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
new file mode 100644
index 000000000000..ca04b9bfc55f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/MergeIntoWriter.scala
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkRuntimeException
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction,
UpdateStarAction}
+import org.apache.spark.sql.functions.expr
+
+/**
+ * `MergeIntoWriter` provides methods to define and execute merge actions based
+ * on specified conditions.
+ *
+ * @tparam T the type of data in the Dataset.
+ * @param table the name of the target table for the merge operation.
+ * @param ds the source Dataset to merge into the target table.
+ * @param on the merge condition.
+ *
+ * @since 4.0.0
+ */
+@Experimental
+class MergeIntoWriter[T] private[sql] (table: String, ds: Dataset[T], on:
Column) {
+
+ private val df: DataFrame = ds.toDF()
+
+ private val sparkSession = ds.sparkSession
+
+ private val tableName =
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table)
+
+ private val logicalPlan = df.queryExecution.logical
+
+ private[sql] var matchedActions: Seq[MergeAction] = Seq.empty[MergeAction]
+ private[sql] var notMatchedActions: Seq[MergeAction] = Seq.empty[MergeAction]
+ private[sql] var notMatchedBySourceActions: Seq[MergeAction] =
Seq.empty[MergeAction]
+
+ /**
+ * Initialize a `WhenMatched` action without any condition.
+ *
+ * This `WhenMatched` action will be executed when a source row matches a
target table row based
+ * on the merge condition.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the matched target table rows with source
dataset rows.
+ * - `update(Map)`: Update all the matched target table rows while
changing only
+ * a subset of columns based on the provided assignment.
+ * - `delete`: Delete all target rows that have a match in the source
table.
+ *
+ * @return a new `WhenMatched` object.
+ */
+ def whenMatched(): WhenMatched[T] = {
+ new WhenMatched[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenMatched` action with a condition.
+ *
+ * This `WhenMatched` action will be executed when a source row matches a
target table row based
+ * on the merge condition and the specified `condition` is satisfied.
+ *
+ * This `WhenMatched` can be followed by one of the following merge actions:
+ * - `updateAll`: Update all the matched target table rows with source
dataset rows.
+ * - `update(Map)`: Update all the matched target table rows while
changing only
+ * a subset of columns based on the provided assignment.
+ * - `delete`: Delete all target rows that have a match in the source
table.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenMatched` object configured with the specified
condition.
+ */
+ def whenMatched(condition: Column): WhenMatched[T] = {
+ new WhenMatched[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Initialize a `WhenNotMatched` action without any condition.
+ *
+ * This `WhenNotMatched` action will be executed when a source row does not
match any target row
+ * based on the merge condition.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `insertAll`: Insert all rows from the source that are not already in
the target table.
+ * - `insert(Map)`: Insert all rows from the source that are not already
in the target table,
+ * with the specified columns based on the provided assignment.
+ *
+ * @return a new `WhenNotMatched` object.
+ */
+ def whenNotMatched(): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenNotMatched` action with a condition.
+ *
+ * This `WhenNotMatched` action will be executed when a source row does not
match any target row
+ * based on the merge condition and the specified `condition` is satisfied.
+ *
+ * This `WhenNotMatched` can be followed by one of the following merge
actions:
+ * - `insertAll`: Insert all rows from the source that are not already in
the target table.
+ * - `insert(Map)`: Insert all rows from the source that are not already
in the target table,
+ * with the specified columns based on the provided assignment.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatched` object configured with the specified
condition.
+ */
+ def whenNotMatched(condition: Column): WhenNotMatched[T] = {
+ new WhenNotMatched[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action without any condition.
+ *
+ * This `WhenNotMatchedBySource` action will be executed when a target row
does not match any
+ * rows in the source table based on the merge condition.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the not matched target table rows with source
dataset rows.
+ * - `update(Map)`: Update all the not matched target table rows while
changing only
+ * the specified columns based on the provided assignment.
+ * - `delete`: Delete all target rows that have no matches in the source
table.
+ *
+ * @return a new `WhenNotMatchedBySource` object.
+ */
+ def whenNotMatchedBySource(): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, None)
+ }
+
+ /**
+ * Initialize a `WhenNotMatchedBySource` action with a condition.
+ *
+ * This `WhenNotMatchedBySource` action will be executed when a target row
does not match any
+ * rows in the source table based on the merge condition and the specified
`condition`
+ * is satisfied.
+ *
+ * This `WhenNotMatchedBySource` can be followed by one of the following
merge actions:
+ * - `updateAll`: Update all the not matched target table rows with source
dataset rows.
+ * - `update(Map)`: Update all the not matched target table rows while
changing only
+ * the specified columns based on the provided assignment.
+ * - `delete`: Delete all target rows that have no matches in the source
table.
+ *
+ * @param condition a `Column` representing the condition to be evaluated
for the action.
+ * @return a new `WhenNotMatchedBySource` object configured with the
specified condition.
+ */
+ def whenNotMatchedBySource(condition: Column): WhenNotMatchedBySource[T] = {
+ new WhenNotMatchedBySource[T](this, Some(condition.expr))
+ }
+
+ /**
+ * Executes the merge operation.
+ */
+ def merge(): Unit = {
+ if (matchedActions.isEmpty && notMatchedActions.isEmpty &&
notMatchedBySourceActions.isEmpty) {
+ throw new SparkRuntimeException(
+ errorClass = "NO_MERGE_ACTION_SPECIFIED",
+ messageParameters = Map.empty)
+ }
+
+ val merge = MergeIntoTable(
+ UnresolvedRelation(tableName),
+ logicalPlan,
+ on.expr,
+ matchedActions,
+ notMatchedActions,
+ notMatchedBySourceActions)
+ val qe = sparkSession.sessionState.executePlan(merge)
+ qe.assertCommandExecuted()
+ }
+
+ private[sql] def withNewMatchedAction(action: MergeAction):
MergeIntoWriter[T] = {
+ this.matchedActions = this.matchedActions :+ action
+ this
+ }
+
+ private[sql] def withNewNotMatchedAction(action: MergeAction):
MergeIntoWriter[T] = {
+ this.notMatchedActions = this.notMatchedActions :+ action
+ this
+ }
+
+ private[sql] def withNewNotMatchedBySourceAction(action: MergeAction):
MergeIntoWriter[T] = {
+ this.notMatchedBySourceActions = this.notMatchedBySourceActions :+ action
+ this
+ }
+}
+
+/**
+ * A class for defining actions to be taken when matching rows in a DataFrame
during
+ * a merge operation.
+ *
+ * @param mergeIntoWriter The MergeIntoWriter instance responsible for
writing data to a
+ * target DataFrame.
+ * @param condition An optional condition Expression that specifies
when the actions
+ * should be applied.
+ * If the condition is None, the actions will be
applied to all matched
+ * rows.
+ *
+ * @tparam T The type of data in the MergeIntoWriter.
+ */
+case class WhenMatched[T] private[sql](
+ mergeIntoWriter: MergeIntoWriter[T],
+ condition: Option[Expression]) {
+ /**
+ * Specifies an action to update all matched rows in the DataFrame.
+ *
+ * @return The MergeIntoWriter instance with the update all action
configured.
+ */
+ def updateAll(): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewMatchedAction(UpdateStarAction(condition))
+ }
+
+ /**
+ * Specifies an action to update matched rows in the DataFrame with the
provided column
+ * assignments.
+ *
+ * @param map A Map of column names to Column expressions representing the
updates to be applied.
+ * @return The MergeIntoWriter instance with the update action configured.
+ */
+ def update(map: Map[String, Column]): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewMatchedAction(
+ UpdateAction(condition, map.map(x => Assignment(expr(x._1).expr,
x._2.expr)).toSeq))
+ }
+
+ /**
+ * Specifies an action to delete matched rows from the DataFrame.
+ *
+ * @return The MergeIntoWriter instance with the delete action configured.
+ */
+ def delete(): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewMatchedAction(DeleteAction(condition))
+ }
+}
+
+/**
+ * A class for defining actions to be taken when no matching rows are found in
a DataFrame
+ * during a merge operation.
+ *
+ * @param MergeIntoWriter The MergeIntoWriter instance responsible for
writing data to a
+ * target DataFrame.
+ * @param condition An optional condition Expression that specifies
when the actions
+ * defined in this configuration should be applied.
+ * If the condition is None, the actions will be
applied when there
+ * are no matching rows.
+ * @tparam T The type of data in the MergeIntoWriter.
+ */
+case class WhenNotMatched[T] private[sql](
+ mergeIntoWriter: MergeIntoWriter[T],
+ condition: Option[Expression]) {
+
+ /**
+ * Specifies an action to insert all non-matched rows into the DataFrame.
+ *
+ * @return The MergeIntoWriter instance with the insert all action
configured.
+ */
+ def insertAll(): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewNotMatchedAction(InsertStarAction(condition))
+ }
+
+ /**
+ * Specifies an action to insert non-matched rows into the DataFrame with
the provided
+ * column assignments.
+ *
+ * @param map A Map of column names to Column expressions representing the
values to be inserted.
+ * @return The MergeIntoWriter instance with the insert action configured.
+ */
+ def insert(map: Map[String, Column]): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewNotMatchedAction(
+ InsertAction(condition, map.map(x => Assignment(expr(x._1).expr,
x._2.expr)).toSeq))
+ }
+}
+
+
+/**
+ * A class for defining actions to be performed when there is no match by
source
+ * during a merge operation in a MergeIntoWriter.
+ *
+ * @param MergeIntoWriter the MergeIntoWriter instance to which the merge
actions will be applied.
+ * @param condition an optional condition to be used with the merge
actions.
+ * @tparam T the type parameter for the MergeIntoWriter.
+ */
+case class WhenNotMatchedBySource[T] private[sql](
+ mergeIntoWriter: MergeIntoWriter[T],
+ condition: Option[Expression]) {
+
+ /**
+ * Specifies an action to update all non-matched rows in the target
DataFrame when
+ * not matched by the source.
+ *
+ * @return The MergeIntoWriter instance with the update all action
configured.
+ */
+ def updateAll(): MergeIntoWriter[T] = {
+
mergeIntoWriter.withNewNotMatchedBySourceAction(UpdateStarAction(condition))
+ }
+
+ /**
+ * Specifies an action to update non-matched rows in the target DataFrame
with the provided
+ * column assignments when not matched by the source.
+ *
+ * @param map A Map of column names to Column expressions representing the
updates to be applied.
+ * @return The MergeIntoWriter instance with the update action configured.
+ */
+ def update(map: Map[String, Column]): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewNotMatchedBySourceAction(
+ UpdateAction(condition, map.map(x => Assignment(expr(x._1).expr,
x._2.expr)).toSeq))
+ }
+
+ /**
+ * Specifies an action to delete non-matched rows from the target DataFrame
when not matched by
+ * the source.
+ *
+ * @return The MergeIntoWriter instance with the delete action configured.
+ */
+ def delete(): MergeIntoWriter[T] = {
+ mergeIntoWriter.withNewNotMatchedBySourceAction(DeleteAction(condition))
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
new file mode 100644
index 000000000000..ed44111c81d2
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoDataFrameSuite.scala
@@ -0,0 +1,946 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.functions._
+
+class MergeIntoDataFrameSuite extends RowLevelOperationSuiteBase {
+
+ import testImplicits._
+
+ test("merge into empty table with NOT MATCHED clause") {
+ withTempView("source") {
+ createTable("pk INT NOT NULL, salary INT, dep STRING")
+
+ val sourceRows = Seq(
+ (1, 100, "hr"),
+ (2, 200, "finance"),
+ (3, 300, "hr"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"), // insert
+ Row(2, 200, "finance"), // insert
+ Row(3, 300, "hr"))) // insert
+ }
+ }
+
+ test("merge into empty table with conditional NOT MATCHED clause") {
+ withTempView("source") {
+ createTable("pk INT NOT NULL, salary INT, dep STRING")
+
+ val sourceRows = Seq(
+ (1, 100, "hr"),
+ (2, 200, "finance"),
+ (3, 300, "hr"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatched($"source.pk" >= 2)
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(2, 200, "finance"), // insert
+ Row(3, 300, "hr"))) // insert
+ }
+ }
+
+ test("merge into with conditional WHEN MATCHED clause (update)") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "corrupted" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 100, "software"),
+ (2, 200, "finance"),
+ (3, 300, "software"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched($"source.pk" === 2)
+ .updateAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"), // unchanged
+ Row(2, 200, "finance"))) // update
+ }
+ }
+
+ test("merge into with conditional WHEN MATCHED clause (delete)") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "corrupted" }
+ |""".stripMargin)
+
+ Seq(1, 2, 3).toDF("pk").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched(col(tableNameAsString + ".salary") === 200)
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(1, 100, "hr"))) // unchanged
+ }
+ }
+
+ test("merge into with assignments to primary key in NOT MATCHED BY SOURCE") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "finance" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 100, "software"),
+ (5, 500, "finance"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ tableNameAsString + ".salary" -> lit(-1)
+ ))
+ .whenNotMatchedBySource()
+ .update(Map(
+ tableNameAsString + ".pk" -> lit(-1)
+ ))
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, -1, "hr"), // update (matched)
+ Row(-1, 200, "finance"))) // update (not matched by source)
+ }
+ }
+
+ test("merge into with assignments to primary key in MATCHED") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "finance" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 100, "software"),
+ (5, 500, "finance"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ tableNameAsString + ".pk" -> lit(-1)
+ ))
+ .whenNotMatchedBySource()
+ .update(Map(
+ tableNameAsString + ".salary" -> lit(-1)
+ ))
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(-1, 100, "hr"), // update (matched)
+ Row(2, -1, "finance"))) // update (not matched by source)
+ }
+ }
+
+ test("merge with all types of clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "hr" }
+ |{ "pk": 5, "salary": 500, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(3, 4, 5, 6).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ tableNameAsString + ".salary" ->
col("cat.ns1.test_table.salary").plus(lit(1))
+ ))
+ .whenNotMatched()
+ .insert(Map(
+ "pk" -> col("source.pk"),
+ "salary" -> lit(0),
+ "dep" -> lit("new")
+ ))
+ .whenNotMatchedBySource()
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(3, 301, "hr"), // update
+ Row(4, 401, "hr"), // update
+ Row(5, 501, "hr"), // update
+ Row(6, 0, "new"))) // insert
+ }
+ }
+
+ test("merge with all types of clauses (update and insert star)") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 101, "support"),
+ (2, 201, "support"),
+ (4, 401, "support"),
+ (5, 501, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched(col(tableNameAsString + ".pk") === 1)
+ .updateAll()
+ .whenNotMatched($"source.pk" === 4)
+ .insertAll()
+ .whenNotMatchedBySource(
+ col(tableNameAsString + ".pk") === col(tableNameAsString +
".salary") / 100)
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "support"), // update
+ Row(2, 200, "software"), // unchanged
+ Row(4, 401, "support"))) // insert
+ }
+ }
+
+ test("merge with all types of conditional clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "hr" }
+ |{ "pk": 5, "salary": 500, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(3, 4, 5, 6, 7).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched(col(tableNameAsString + ".pk") === 4)
+ .update(Map(
+ tableNameAsString + ".salary" -> col(tableNameAsString +
".salary").plus(lit(1))
+ ))
+ .whenNotMatched($"pk" === 6)
+ .insert(Map(
+ "pk" -> col("source.pk"),
+ "salary" -> lit(0),
+ "dep" -> lit("new")
+ ))
+ .whenNotMatchedBySource($"salary" === 100)
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(2, 200, "software"), // unchanged
+ Row(3, 300, "hr"), // unchanged
+ Row(4, 401, "hr"), // update
+ Row(5, 500, "hr"), // unchanged
+ Row(6, 0, "new"))) // insert
+ }
+ }
+
+ test("merge with one NOT MATCHED BY SOURCE clause") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(1, 2).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource()
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"), // unchanged
+ Row(2, 200, "software"))) // unchanged
+ }
+ }
+
+ test("merge with one conditional NOT MATCHED BY SOURCE clause") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(2).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource($"salary" === 100)
+ .update(Map(
+ "salary" -> lit(-1)
+ ))
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, -1, "hr"), // updated
+ Row(2, 200, "software"), // unchanged
+ Row(3, 300, "hr"))) // unchanged
+ }
+ }
+
+ test("merge with MATCHED and NOT MATCHED BY SOURCE clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(2).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .delete()
+ .whenNotMatchedBySource($"salary" === 100)
+ .update(Map(
+ "salary" -> lit(-1)
+ ))
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, -1, "hr"), // updated
+ Row(3, 300, "hr"))) // unchanged
+ }
+ }
+
+ test("merge with NOT MATCHED and NOT MATCHED BY SOURCE clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(2, 3, 4).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatched()
+ .insert(Map(
+ "pk" -> col("pk"),
+ "salary" -> lit(-1),
+ "dep" -> lit("new")
+ ))
+ .whenNotMatchedBySource()
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(2, 200, "software"), // unchanged
+ Row(3, 300, "hr"), // unchanged
+ Row(4, -1, "new"))) // insert
+ }
+ }
+
+ test("merge with multiple NOT MATCHED BY SOURCE clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(5, 6, 7).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource($"salary" === 100)
+ .update(Map(
+ "salary" -> col("salary").plus(lit(1))
+ ))
+ .whenNotMatchedBySource($"salary" === 300)
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "hr"), // update
+ Row(2, 200, "software"))) // unchanged
+ }
+ }
+
+ test("merge with MATCHED BY SOURCE clause and NULL values") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+ """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "id": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (2, 2, 201, "support"),
+ (1, 1, 101, "support"),
+ (3, 3, 301, "support"))
+ sourceRows.toDF("pk", "id", "salary",
"dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString,
+ $"source.id" === col(tableNameAsString + ".id") &&
(col(tableNameAsString + ".id") < 3))
+ .whenMatched()
+ .updateAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, null, 100, "hr"), // unchanged
+ Row(2, 2, 201, "support"), // update
+ Row(3, 3, 300, "hr"))) // unchanged
+ }
+ }
+
+ test("merge cardinality check with unconditional MATCHED clause (delete)") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 6, "salary": 600, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 101, "support"),
+ (1, 102, "support"),
+ (2, 201, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(6, 600, "software"))) // unchanged
+ }
+ }
+
+ test("merge cardinality check with only NOT MATCHED clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 6, "salary": 600, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 101, "support"),
+ (1, 102, "support"),
+ (2, 201, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"), // unchanged
+ Row(2, 201, "support"), // insert
+ Row(6, 600, "software"))) // unchanged
+ }
+ }
+
+ test("merge with extra columns in source") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, "smth", 101, "support"),
+ (2, "smth", 201, "support"),
+ (4, "smth", 401, "support"))
+ sourceRows.toDF("pk", "extra", "salary",
"dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ "salary" -> col("source.salary").plus(lit(1))
+ ))
+ .whenNotMatched()
+ .insert(Map(
+ "pk" -> col("source.pk"),
+ "salary" -> col("source.salary"),
+ "dep" -> col("source.dep")
+ ))
+ .whenNotMatchedBySource()
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 102, "hr"), // update
+ Row(2, 202, "software"), // update
+ Row(4, 401, "support"))) // insert
+ }
+ }
+
+ test("merge with NULL values in target and source") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+ """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (5, None, 501, "support"),
+ (6, Some(6), 601, "support"))
+ sourceRows.toDF("pk", "id", "salary",
"dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, null, 100, "hr"), // unchanged
+ Row(2, 2, 200, "software"), // unchanged
+ Row(5, null, 501, "support"), // insert
+ Row(6, 6, 601, "support"))) // insert
+ }
+ }
+
+ test("merge with <=>") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+ """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (5, None, 501, "support"),
+ (6, Some(6), 601, "support"))
+ sourceRows.toDF("pk", "id", "salary",
"dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.id" <=> col(tableNameAsString +
".id"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(2, 2, 200, "software"), // unchanged
+ Row(5, null, 501, "support"), // updated
+ Row(6, 6, 601, "support"))) // insert
+ }
+ }
+
+ test("merge with NULL ON condition") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, id INT, salary INT, dep STRING",
+ """{ "pk": 1, "id": null, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (5, None, 501, "support"),
+ (6, Some(2), 201, "support"))
+ sourceRows.toDF("pk", "id", "salary",
"dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk") && lit(null))
+ .whenMatched()
+ .update(Map(
+ "salary" -> col("source.salary")
+ ))
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, null, 100, "hr"), // unchanged
+ Row(2, 2, 200, "software"), // unchanged
+ Row(5, null, 501, "support"), // new
+ Row(6, 2, 201, "support"))) // new
+ }
+ }
+
+ test("merge with NULL clause conditions") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 101, "support"),
+ (3, 301, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched(lit(null))
+ .update(Map(
+ "salary" -> col("source.salary")
+ ))
+ .whenNotMatched(lit(null))
+ .insertAll()
+ .whenNotMatchedBySource(lit(null))
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 100, "hr"), // unchanged
+ Row(2, 200, "software"))) // unchanged
+ }
+ }
+
+ test("merge with multiple matching clauses") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ (1, 101, "support"),
+ (3, 301, "support"))
+ sourceRows.toDF("pk", "salary", "dep").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched(col(tableNameAsString + ".pk") === 1)
+ .update(Map(
+ "salary" -> col(tableNameAsString + ".salary").plus(lit(5))
+ ))
+ .whenMatched(col(tableNameAsString + ".salary") === 100)
+ .update(Map(
+ "salary" -> col(tableNameAsString + ".salary").plus(lit(2))
+ ))
+ .whenNotMatchedBySource(col(tableNameAsString + ".pk") === 2)
+ .update(Map(
+ "salary" -> col("salary").minus(lit(1))
+ ))
+ .whenNotMatchedBySource(col(tableNameAsString + ".salary") === 200)
+ .delete()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 105, "hr"), // updated (matched)
+ Row(2, 199, "software"))) // updated (not matched by source)
+ }
+ }
+
+ test("merge resolves and aligns columns by name") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceRows = Seq(
+ ("support", 1, 101),
+ ("support", 3, 301))
+ sourceRows.toDF("dep", "pk", "salary").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "support"), // update
+ Row(2, 200, "software"), // unchanged
+ Row(3, 301, "support"))) // insert
+ }
+ }
+
+ test("merge refreshed relation cache") {
+ withTempView("temp", "source") {
+ withCache("temp") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 100, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ // define a view on top of the table
+ val query = sql(s"SELECT * FROM $tableNameAsString WHERE salary = 100")
+ query.createOrReplaceTempView("temp")
+
+ // cache the view
+ sql("CACHE TABLE temp")
+
+ // verify the view returns expected results
+ checkAnswer(
+ sql("SELECT * FROM temp"),
+ Row(1, 100, "hr") :: Row(2, 100, "software") :: Nil)
+
+ val sourceRows = Seq(
+ ("support", 1, 101),
+ ("support", 3, 301))
+ sourceRows.toDF("dep", "pk",
"salary").createOrReplaceTempView("source")
+
+ // merge changes into the table
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString
+ ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ // verify the merge was successful
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "support"), // update
+ Row(2, 100, "software"), // unchanged
+ Row(3, 301, "support"))) // insert
+
+ // verify the view reflects the changes in the table
+ checkAnswer(sql("SELECT * FROM temp"), Row(2, 100, "software") :: Nil)
+ }
+ }
+ }
+
+ test("merge with updates to nested struct fields in MATCHED clauses") {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } }
}, "dep": "hr" }""")
+
+ Seq(1, 3).toDF("pk").createOrReplaceTempView("source")
+
+ // update primitive, array, map columns inside a struct
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ "s.c1" -> lit(-1),
+ "s.c2.m" -> map(lit('k'), lit('v')),
+ "s.c2.a" -> array(lit(-1))
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"))), "hr")))
+
+ // set primitive, array, map columns to NULL (proper casts should be in
inserted)
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ "s.c1" -> lit(null),
+ "s.c2" -> lit(null)
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Row(1, Row(null, null), "hr") :: Nil)
+
+ // assign an entire struct
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ "s" -> struct(
+ lit(1).as("c1"),
+ struct(array(lit(1)).as("a"), lit(null).as("m")).as("c2"))
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Row(1, Row(1, Row(Seq(1), null)), "hr") :: Nil)
+ }
+ }
+
+ test("merge with updates to nested struct fields in NOT MATCHED BY SOURCE
clauses") {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" } }
}, "dep": "hr" }""")
+
+ Seq(2, 4).toDF("pk").createOrReplaceTempView("source")
+
+ // update primitive, array, map columns inside a struct
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource()
+ .update(Map(
+ "s.c1" -> lit(-1),
+ "s.c2.m" -> map(lit('k'), lit('v')),
+ "s.c2.a" -> array(lit(-1))
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"))), "hr")))
+
+ // set primitive, array, map columns to NULL (proper casts should be in
inserted)
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource()
+ .update(Map(
+ "s.c1" -> lit(null),
+ "s.c2" -> lit(null)
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Row(1, Row(null, null), "hr") :: Nil)
+
+ // assign an entire struct
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenNotMatchedBySource()
+ .update(Map(
+ "s" -> struct(
+ lit(1).as("c1"),
+ struct(array(lit(1)).as("a"), lit(null).as("m")).as("c2"))
+ ))
+ .merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Row(1, Row(1, Row(Seq(1), null)), "hr") :: Nil)
+ }
+ }
+
+ test("merge with char/varchar columns") {
+ withTempView("source") {
+ createTable("pk INT NOT NULL, s STRUCT<n_c: CHAR(3), n_vc: VARCHAR(5)>,
dep STRING")
+
+ append("pk INT NOT NULL, s STRUCT<n_c: STRING, n_vc: STRING>, dep
STRING",
+ """{ "pk": 1, "s": { "n_c": "aaa", "n_vc": "aaa" }, "dep": "hr" }
+ |{ "pk": 2, "s": { "n_c": "bbb", "n_vc": "bbb" }, "dep": "software" }
+ |{ "pk": 3, "s": { "n_c": "ccc", "n_vc": "ccc" }, "dep": "hr" }
+ |""".stripMargin)
+
+ Seq(1, 2, 4).toDF("pk").createOrReplaceTempView("source")
+
+ spark.table("source")
+ .mergeInto(tableNameAsString, $"source.pk" === col(tableNameAsString +
".pk"))
+ .whenMatched()
+ .update(Map(
+ "s.n_c" -> lit("x1"),
+ "s.n_vc" -> lit("x2")
+ ))
+ .whenNotMatchedBySource()
+ .update(Map(
+ "s.n_c" -> lit("y1"),
+ "s.n_vc" -> lit("y2")
+ ))
+ .merge()
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row("x1 ", "x2"), "hr"), // update (matched)
+ Row(2, Row("x1 ", "x2"), "software"), // update (matched)
+ Row(3, Row("y1 ", "y2"), "hr"))) // update (not matched by source)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]