This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 23d925399424 [SPARK-54525][SQL] Disable nested struct coercion in
MERGE INTO under a config
23d925399424 is described below
commit 23d9253994240732ebf2df2867d3ceb4d1a783a4
Author: Szehon Ho <[email protected]>
AuthorDate: Sat Nov 29 13:02:32 2025 -0800
[SPARK-54525][SQL] Disable nested struct coercion in MERGE INTO under a
config
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/52225 allow MERGE INTO to support case
where assignment value is a struct with less fields than the assignment key, ie
UPDATE SET big_struct = source.small_struct.
This makes this feature off by default, and turned on via a config.
### Why are the changes needed?
The change brought some interesting question, for example there is some
ambiguity in user intent. Does the UPDATE SET * mean set all nested fields or
top level columns? In the first case, missing fields are kept. In the second
case, missing fields are nullified.
I tried to make a choice in https://github.com/apache/spark/pull/53149 but
after some feedback, it may be a bit controversial, choosing one interpretation
over another. A SQLConf may not be the right choice, and instead we may need
to introduce some new syntax, which require more discussion.
### Does this PR introduce _any_ user-facing change?
No this feature is unreleased
### How was this patch tested?
Existing unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53229 from szehon-ho/disable_merge_update_source_coercion.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/catalyst/analysis/AssignmentUtils.scala | 139 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 5 +-
.../sql/connector/MergeIntoTableSuiteBase.scala | 2491 +++++++++++---------
3 files changed, 1379 insertions(+), 1256 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index 185dc5ec54f6..c9ed5b86dbde 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable
import org.apache.spark.sql.catalyst.SQLConfHelper
import
org.apache.spark.sql.catalyst.analysis.TableOutputResolver.DefaultValueFillMode.{NONE,
RECURSE}
-import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
CreateNamedStruct, Expression, GetStructField, If, IsNull, Literal}
-import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Expression, GetStructField, IsNull, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -182,31 +181,11 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
} else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) {
TableOutputResolver.checkNullability(colExpr, col, conf, colPath)
} else if (exactAssignments.nonEmpty) {
- if (updateStar) {
- val value = exactAssignments.head.value
- col.dataType match {
- case structType: StructType =>
- // Expand assignments to leaf fields
- val structAssignment =
- applyNestedFieldAssignments(col, colExpr, value, addError,
colPath,
- coerceNestedTypes)
-
- // Wrap with null check for missing source fields
- fixNullExpansion(col, value, structType, structAssignment,
- colPath, addError)
- case _ =>
- // For non-struct types, resolve directly
- val coerceMode = if (coerceNestedTypes) RECURSE else NONE
- TableOutputResolver.resolveUpdate("", value, col, conf, addError,
colPath,
- coerceMode)
- }
- } else {
- val value = exactAssignments.head.value
- val coerceMode = if (coerceNestedTypes) RECURSE else NONE
- val resolvedValue = TableOutputResolver.resolveUpdate("", value, col,
conf, addError,
- colPath, coerceMode)
- resolvedValue
- }
+ val value = exactAssignments.head.value
+ val coerceMode = if (coerceNestedTypes) RECURSE else NONE
+ val resolvedValue = TableOutputResolver.resolveUpdate("", value, col,
conf, addError,
+ colPath, coerceMode)
+ resolvedValue
} else {
applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath,
coerceNestedTypes)
}
@@ -240,63 +219,6 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
}
}
- private def applyNestedFieldAssignments(
- col: Attribute,
- colExpr: Expression,
- value: Expression,
- addError: String => Unit,
- colPath: Seq[String],
- coerceNestedTyptes: Boolean): Expression = {
-
- col.dataType match {
- case structType: StructType =>
- val fieldAttrs = DataTypeUtils.toAttributes(structType)
-
- val updatedFieldExprs = fieldAttrs.zipWithIndex.map { case (fieldAttr,
ordinal) =>
- val fieldPath = colPath :+ fieldAttr.name
- val targetFieldExpr = GetStructField(colExpr, ordinal,
Some(fieldAttr.name))
-
- // Try to find a corresponding field in the source value by name
- val sourceFieldValue: Expression = value.dataType match {
- case valueStructType: StructType =>
- valueStructType.fields.find(f => conf.resolver(f.name,
fieldAttr.name)) match {
- case Some(matchingField) =>
- // Found matching field in source, extract it
- val fieldIndex =
valueStructType.fieldIndex(matchingField.name)
- GetStructField(value, fieldIndex, Some(matchingField.name))
- case None =>
- // Field doesn't exist in source, use target's current value
with null check
- TableOutputResolver.checkNullability(targetFieldExpr,
fieldAttr, conf, fieldPath)
- }
- case _ =>
- // Value is not a struct, cannot extract field
- addError(s"Cannot assign non-struct value to struct field
'${fieldPath.quoted}'")
- Literal(null, fieldAttr.dataType)
- }
-
- // Recurse or resolve based on field type
- fieldAttr.dataType match {
- case nestedStructType: StructType =>
- // Field is a struct, recurse
- applyNestedFieldAssignments(fieldAttr, targetFieldExpr,
sourceFieldValue,
- addError, fieldPath, coerceNestedTyptes)
- case _ =>
- // Field is not a struct, resolve with TableOutputResolver
- val coerceMode = if (coerceNestedTyptes) RECURSE else NONE
- TableOutputResolver.resolveUpdate("", sourceFieldValue,
fieldAttr, conf, addError,
- fieldPath, coerceMode)
- }
- }
- toNamedStruct(structType, updatedFieldExprs)
-
- case otherType =>
- addError(
- "Updating nested fields is only supported for StructType but " +
- s"'${colPath.quoted}' is of type $otherType")
- colExpr
- }
- }
-
private def toNamedStruct(structType: StructType, fieldExprs:
Seq[Expression]): Expression = {
val namedStructExprs = structType.fields.zip(fieldExprs).flatMap { case
(field, expr) =>
Seq(Literal(field.name), expr)
@@ -350,55 +272,6 @@ object AssignmentUtils extends SQLConfHelper with
CastSupport {
IsNull(currentExpr)
}
- /**
- * As UPDATE SET * can assign struct fields individually (preserving
existing fields),
- * this will lead to null expansion, ie, a struct is created where all
fields are null.
- * Wraps a struct assignment with null checks for the source and missing
source fields.
- * Return null if all are null.
- *
- * @param col the target column attribute
- * @param value the source value expression
- * @param structType the target struct type
- * @param structAssignment the struct assignment result to wrap
- * @param colPath the column path for error reporting
- * @param addError error reporting function
- * @return the wrapped expression with null checks
- */
- private def fixNullExpansion(
- col: Attribute,
- value: Expression,
- structType: StructType,
- structAssignment: Expression,
- colPath: Seq[String],
- addError: String => Unit): Expression = {
- // As StoreAssignmentPolicy.LEGACY is not allowed in DSv2, always add null
check for
- // non-nullable column
- if (!col.nullable) {
- AssertNotNull(value)
- } else {
- // Check if source struct is null
- val valueIsNull = IsNull(value)
-
- // Check if missing source paths (paths in target but not in source) are
not null
- // These will be null for the case of UPDATE SET * and
- val missingSourcePaths = getMissingSourcePaths(structType,
value.dataType, colPath, addError)
- val condition = if (missingSourcePaths.nonEmpty) {
- // Check if all target attributes at missing source paths are null
- val missingFieldNullChecks = missingSourcePaths.map { path =>
- createNullCheckForFieldPath(col, path)
- }
- // Combine all null checks with AND
- val allMissingFieldsNull =
missingFieldNullChecks.reduce[Expression]((a, b) => And(a, b))
- And(valueIsNull, allMissingFieldsNull)
- } else {
- valueIsNull
- }
-
- // Return: If (condition) THEN NULL ELSE structAssignment
- If(condition, Literal(null, structAssignment.dataType), structAssignment)
- }
- }
-
/**
* Checks whether assignments are aligned and compatible with table columns.
*
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1aa97cad0bbe..d6fa8b21b21a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -6707,10 +6707,11 @@ object SQLConf {
buildConf("spark.sql.merge.nested.type.coercion.enabled")
.internal()
.doc("If enabled, allow MERGE INTO to coerce source nested types if they
have less" +
- "nested fields than the target table's nested types.")
+ "nested fields than the target table's nested types. This is
experimental and" +
+ "the semantics may change.")
.version("4.1.0")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
/**
* Holds information about keys that have been deprecated.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 680fa63e0929..7539506e8bfe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -3171,222 +3171,275 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
test("merge into schema evolution replace column with nested struct and set
explicit columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING""".stripMargin,
- """{ "pk": 1, "s": { "c1": 2, "c2": { "a": [1,2], "m": { "a": "b" }
} }, "dep": "hr" }""")
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>,
+ | m: MAP<STRING, STRING>>>,
+ |dep STRING)""".stripMargin)
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- // removed column 'a'
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- StructField("dep", StringType)
- ))
- val data = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ spark.createDataFrame(
+ spark.sparkContext.parallelize(targetData), targetSchema)
+ .writeTo(tableNameAsString).append()
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a =
array(-1),
- | s.c2.c3 = src.s.c2.c3
- |WHEN NOT MATCHED THEN
- | INSERT (pk, s, dep) VALUES (src.pk,
- | named_struct('c1', src.s.c1,
- | 'c2', named_struct('a', array(-2), 'm', map('g', 'h'),
'c3', true)), src.dep)
- |""".stripMargin
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val data = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"),
- Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)),
"engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET s.c1 = -1, s.c2.m = map('k', 'v'), s.c2.a =
array(-1),
+ | s.c2.c3 = src.s.c2.c3
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, s, dep) VALUES (src.pk,
+ | named_struct('c1', src.s.c1,
+ | 'c2', named_struct('a', array(-2), 'm', map('g', 'h'),
'c3', true)), src.dep)
+ |""".stripMargin
+
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)),
"hr"),
+ Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get == "FIELD_NOT_FOUND")
+ assert(exception.getMessage.contains("No such struct field `c3`
in `a`, `m`. "))
+ }
}
- assert(exception.errorClass.get == "FIELD_NOT_FOUND")
- assert(exception.getMessage.contains("No such struct field `c3` in
`a`, `m`. "))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into schema evolution replace column with nested struct and set
all columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- // Create table using Spark SQL
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
- // Insert data using DataFrame API with objects
- val tableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
- StructField("dep", StringType)
- ))
- val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
- .coalesce(1).writeTo(tableNameAsString).append()
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Create table using Spark SQL
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+ // Insert data using DataFrame API with objects
+ val tableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
+ .coalesce(1).writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- // missing column 'a'
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- StructField("dep", StringType)
- ))
- val sourceData = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ // missing column 'a'
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType) // new column
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val sourceData = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into schema evolution replace column with nested struct and
update " +
"top level struct") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- // Create table using Spark SQL
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Create table using Spark SQL
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
- // Insert data using DataFrame API with objects
- val tableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
- StructField("dep", StringType)
- ))
- val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
- .coalesce(1).writeTo(tableNameAsString).append()
+ // Insert data using DataFrame API with objects
+ val tableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
+ .coalesce(1).writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- // missing column 'a'
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- StructField("dep", StringType)
- ))
- val sourceData = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ // missing column 'a'
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType) // new column
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val sourceData = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET s = src.s
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET s = src.s
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
@@ -3513,249 +3566,309 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
test("merge into schema evolution replace column for struct in map and set
all columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- val schema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c2",
IntegerType))),
- StructType(Seq(StructField("c4", StringType), StructField("c5",
StringType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(schema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ val schema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", IntegerType))),
+ StructType(Seq(StructField("c4", StringType),
StructField("c5", StringType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(schema))
- val data = Seq(
- Row(0, Map(Row(10, 10) -> Row("c", "c")), "hr"),
- Row(1, Map(Row(20, 20) -> Row("d", "d")), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
- .writeTo(tableNameAsString).append()
+ val data = Seq(
+ Row(0, Map(Row(10, 10) -> Row("c", "c")), "hr"),
+ Row(1, Map(Row(20, 20) -> Row("d", "d")), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c3",
BooleanType))),
- StructType(Seq(StructField("c4", StringType), StructField("c6",
BooleanType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
- Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c3", BooleanType))),
+ StructType(Seq(StructField("c4", StringType),
StructField("c6", BooleanType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
+ Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), "hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `m`.`key`.`c2`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into schema evolution replace column for struct in map and set
explicit columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- val schema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c2",
IntegerType))),
- StructType(Seq(StructField("c4", StringType), StructField("c5",
StringType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(schema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ val schema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", IntegerType))),
+ StructType(Seq(StructField("c4", StringType),
StructField("c5", StringType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(schema))
- val data = Seq(
- Row(0, Map(Row(10, 10) -> Row("c", "c")), "hr"),
- Row(1, Map(Row(20, 20) -> Row("d", "d")), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
- .writeTo(tableNameAsString).append()
+ val data = Seq(
+ Row(0, Map(Row(10, 10) -> Row("c", "c")), "hr"),
+ Row(1, Map(Row(20, 20) -> Row("d", "d")), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c3",
BooleanType))),
- StructType(Seq(StructField("c4", StringType), StructField("c6",
BooleanType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
- Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c3", BooleanType))),
+ StructType(Seq(StructField("c4", StringType),
StructField("c6", BooleanType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10, true) -> Row("y", false)), "sales"),
+ Row(2, Map(Row(20, false) -> Row("z", true)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET t.m = src.m, t.dep = 'my_old_dep'
- |WHEN NOT MATCHED THEN
- | INSERT (pk, m, dep) VALUES (src.pk, src.m, 'my_new_dep')
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET t.m = src.m, t.dep = 'my_old_dep'
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, m, dep) VALUES (src.pk, src.m, 'my_new_dep')
+ |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)), "hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `m`.`key`.`c2`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `m`.`key`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into schema evolution replace column for struct in array and set
all columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- val schema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("a", ArrayType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c2",
IntegerType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(schema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ val schema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("a", ArrayType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", IntegerType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(schema))
- val data = Seq(
- Row(0, Array(Row(10, 10)), "hr"),
- Row(1, Array(Row(20, 20)), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
- .writeTo(tableNameAsString).append()
+ val data = Seq(
+ Row(0, Array(Row(10, 10)), "hr"),
+ Row(1, Array(Row(20, 20)), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("a", ArrayType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c3",
BooleanType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Array(Row(10, true)), "sales"),
- Row(2, Array(Row(20, false)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("a", ArrayType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c3", BooleanType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Array(Row(10, true)), "sales"),
+ Row(2, Array(Row(20, false)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "sales"),
- Row(2, Array(Row(20, null, false)), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "sales"),
+ Row(2, Array(Row(20, null, false)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct
`a`.`element`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `a`.`element`.`c2`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `a`.`element`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into schema evolution replace column for struct in array and set
explicit columns") {
Seq(true, false).foreach { withSchemaEvolution =>
- withTempView("source") {
- val schema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("a", ArrayType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c2",
IntegerType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(schema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ val schema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("a", ArrayType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", IntegerType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(schema))
- val data = Seq(
- Row(0, Array(Row(10, 10)), "hr"),
- Row(1, Array(Row(20, 20)), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
- .writeTo(tableNameAsString).append()
+ val data = Seq(
+ Row(0, Array(Row(10, 10)), "hr"),
+ Row(1, Array(Row(20, 20)), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("a", ArrayType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c3",
BooleanType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Array(Row(10, true)), "sales"),
- Row(2, Array(Row(20, false)), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("a", ArrayType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c3", BooleanType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Array(Row(10, true)), "sales"),
+ Row(2, Array(Row(20, false)), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- val mergeStmt =
- s"""MERGE $schemaEvolutionClause
- |INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET t.a = src.a, t.dep = 'my_old_dep'
- |WHEN NOT MATCHED THEN
- | INSERT (pk, a, dep) VALUES (src.pk, src.a, 'my_new_dep')
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET t.a = src.a, t.dep = 'my_old_dep'
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, a, dep) VALUES (src.pk, src.a, 'my_new_dep')
+ |""".stripMargin
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "my_old_dep"),
- Row(2, Array(Row(20, null, false)), "my_new_dep")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "my_old_dep"),
+ Row(2, Array(Row(20, null, false)), "my_new_dep")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct
`a`.`element`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `a`.`element`.`c2`"))
+ }
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `a`.`element`"))
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge into empty table with NOT MATCHED clause schema evolution") {
@@ -4447,200 +4560,268 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in struct nested in array") {
- withTempView("source") {
- // Target table has struct with 3 fields (c1, c2, c3) in array
- createAndInitTable(
- s"""pk INT NOT NULL,
- |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], "dep":
"sales" }
- |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ], "dep":
"sales" }"""
- .stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields (c1, c2, c3) in array
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], "dep":
"sales" }
+ |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ],
"dep": "sales" }"""
+ .stripMargin)
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("a", ArrayType(
- StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType))))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Array(Row(10, "c")), "hr"),
- Row(2, Array(Row(30, "e")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("a", ArrayType(
+ StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Array(Row(10, "c")), "hr"),
+ Row(2, Array(Row(30, "e")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- sql(
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Array(Row(1, "a", true)), "sales"),
- Row(1, Array(Row(10, "c", null)), "hr"),
- Row(2, Array(Row(30, "e", null)), "engineering")))
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Array(Row(1, "a", true)), "sales"),
+ Row(1, Array(Row(10, "c", null)), "hr"),
+ Row(2, Array(Row(30, "e", null)), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `a`.`element`.`c3`."))
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
test("merge into with source missing fields in struct nested in map key") {
- withTempView("source") {
- // Target table has struct with 2 fields in map key
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType), StructField("c2",
BooleanType))),
- StructType(Seq(StructField("c3", StringType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map key
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", BooleanType))),
+ StructType(Seq(StructField("c3", StringType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
- val targetData = Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(20, false) -> Row("y")), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ val targetData = Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(20, false) -> Row("y")), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Source table has struct with only 1 field (c1) in map key - missing c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))), // missing c2
- StructType(Seq(StructField("c3", StringType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 1 field (c1) in map key -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))), // missing c2
+ StructType(Seq(StructField("c3", StringType))))),
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- sql(
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(10, null) -> Row("z")), "sales"),
- Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(10, null) -> Row("z")), "sales"),
+ Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `m`.`key`.`c2`."))
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
test("merge into with source missing fields in struct nested in map value") {
- withTempView("source") {
- // Target table has struct with 2 fields in map value
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType), StructField("c2",
BooleanType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map value
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))),
+ StructType(Seq(StructField("c1", StringType),
StructField("c2", BooleanType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
- val targetData = Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(20) -> Row("y", false)), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ val targetData = Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(20) -> Row("y", false)), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Source table has struct with only 1 field (c1) in map value - missing
c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType))))), // missing c2
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 1 field (c1) in map value -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))),
+ StructType(Seq(StructField("c1", StringType))))), // missing c2
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- sql(
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(10) -> Row("z", null)), "sales"),
- Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(10) -> Row("z", null)), "sales"),
+ Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `m`.`value`.`c2`."))
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
test("merge into with source missing fields in top-level struct") {
- withTempView("source") {
- // Target table has struct with 3 fields at top level
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields at top level
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- sql(
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c3`."))
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
test("merge with null struct") {
@@ -4934,128 +5115,142 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
test("merge null struct with non-nullable nested field - source with missing
" +
"and extra nested fields") {
- withSQLConf(
- SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") {
- withTempView("source") {
- // Target table has nested struct with NON-NULLABLE field b
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, "dep":
"sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, "dep":
"hr" }"""
- .stripMargin)
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING NOT NULL>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
- // Source table has missing field 'b' and extra field 'c' in nested
struct
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- // missing field 'b' (which is non-nullable in target)
- StructField("c", StringType) // extra field 'c'
- )))
- ))),
- StructField("dep", StringType)
- ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("c", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE WITH SCHEMA EVOLUTION
- |INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause
+ |INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- // All cases should fail due to non-nullable constraint violation
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`b`"))
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains("Cannot write incompatible data
for the table ``: " +
- "Cannot find data for the output column `s`.`c2`.`b`."))
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
test("merge with null struct using default value") {
- withTempView("source") {
- // Target table has nested struct with a default value
- sql(
- s"""CREATE TABLE $tableNameAsString (
- | pk INT NOT NULL,
- | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
- | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
- | dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+ | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
- // Insert initial data using DataFrame API
- val initialSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- val initialData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
- .writeTo(tableNameAsString).append()
+ val initialSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val initialData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
+ .writeTo(tableNameAsString).append()
- // Source table has null for the nested struct
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
- ))),
- StructField("dep", StringType)
- ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- sql(
- s"""MERGE INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
+ val mergeStmt =
+ s"""MERGE INTO $tableNameAsString t USING source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`b`"))
+ }
+ }
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
@@ -5170,8 +5365,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
- Row(1, Row(10, Row(20, true)), "sales"),
- Row(2, Row(20, Row(30, false)), "engineering")))
+ Row(1, Row(10, Row(20, null)), "sales"),
+ Row(2, Row(20, Row(30, null)), "engineering")))
} else {
val exception = intercept[Exception] {
sql(mergeStmt)
@@ -5254,222 +5449,46 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.writeTo(tableNameAsString).append()
val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("salary", IntegerType),
- Column.create("dep", StringType),
- Column.create("new_col", IntegerType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
-
- sql(s"INSERT INTO $sourceTable VALUES (1, 101, 'support', 1)," +
- s"(3, 301, 'support', 3), (4, 401, 'finance', 4)")
-
- val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString,
- $"source_table.pk" === col(tableNameAsString + ".pk"))
- .whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
-
- if (withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 101, "support", 1),
- Row(2, 200, "software", null),
- Row(3, 301, "support", 3),
- Row(4, 401, "finance", 4)))
- } else {
- mergeBuilder.merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 101, "support"),
- Row(2, 200, "software"),
- Row(3, 301, "support"),
- Row(4, 401, "finance")))
- }
-
- sql(s"DROP TABLE $tableNameAsString")
- }
- }
- }
-
- test("merge schema evolution new column with set explicit column using
dataframe API") {
- Seq(true, false).foreach { withSchemaEvolution =>
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- sql(s"CREATE TABLE $tableNameAsString (pk INT NOT NULL, salary INT,
dep STRING)")
-
- val targetData = Seq(
- Row(1, 100, "hr"),
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 400, "marketing"),
- Row(5, 500, "executive")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("salary", IntegerType),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("salary", IntegerType),
- Column.create("dep", StringType),
- Column.create("active", BooleanType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
-
- sql(s"INSERT INTO $sourceTable VALUES (4, 150, 'dummy', true)," +
- s"(5, 250, 'dummy', true), (6, 350, 'dummy', false)")
-
- val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
- .whenMatched()
- .update(Map("dep" -> lit("software"), "active" ->
col("source_table.active")))
- .whenNotMatched()
- .insert(Map("pk" -> col("source_table.pk"), "salary" -> lit(0),
- "dep" -> col("source_table.dep"), "active" ->
col("source_table.active")))
-
- if (withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 100, "hr", null),
- Row(2, 200, "software", null),
- Row(3, 300, "hr", null),
- Row(4, 400, "software", true),
- Row(5, 500, "software", true),
- Row(6, 0, "dummy", false)))
- } else {
- val e = intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
- assert(e.getMessage.contains("A column, variable, or function
parameter with name " +
- "`active` cannot be resolved"))
- }
-
- sql(s"DROP TABLE $tableNameAsString")
- }
- }
- }
-
- test("merge schema evolution add column with nested struct and set explicit
columns " +
- "using dataframe API") {
- Seq(true, false).foreach { withSchemaEvolution =>
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)""".stripMargin)
-
- val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
-
- val data = Seq(
- Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)),
"engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("salary", IntegerType),
+ Column.create("dep", StringType),
+ Column.create("new_col", IntegerType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ sql(s"INSERT INTO $sourceTable VALUES (1, 101, 'support', 1)," +
+ s"(3, 301, 'support', 3), (4, 401, 'finance', 4)")
val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .mergeInto(tableNameAsString,
+ $"source_table.pk" === col(tableNameAsString + ".pk"))
.whenMatched()
- .update(Map(
- "s.c1" -> lit(-1),
- "s.c2.m" -> map(lit("k"), lit("v")),
- "s.c2.a" -> array(lit(-1)),
- "s.c2.c3" -> col("source_table.s.c2.c3")))
+ .updateAll()
.whenNotMatched()
- .insert(Map(
- "pk" -> col("source_table.pk"),
- "s" -> struct(
- col("source_table.s.c1").as("c1"),
- struct(
- col("source_table.s.c2.a").as("a"),
- map(lit("g"), lit("h")).as("m"),
- lit(true).as("c3")
- ).as("c2")
- ),
- "dep" -> col("source_table.dep")))
+ .insertAll()
if (withSchemaEvolution) {
mergeBuilder.withSchemaEvolution().merge()
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"),
- Row(2, Row(20, Row(Seq(4, 5), Map("g" -> "h"), true)),
"engineering")))
+ Seq(
+ Row(1, 101, "support", 1),
+ Row(2, 200, "software", null),
+ Row(3, 301, "support", 3),
+ Row(4, 401, "finance", 4)))
} else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get == "FIELD_NOT_FOUND")
- assert(exception.getMessage.contains("No such struct field `c3` in
`a`, `m`. "))
+ mergeBuilder.merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 101, "support"),
+ Row(2, 200, "software"),
+ Row(3, 301, "support"),
+ Row(4, 401, "finance")))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -5477,29 +5496,22 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
- test("merge schema evolution add column with nested struct and set all
columns " +
- "using dataframe API") {
+ test("merge schema evolution new column with set explicit column using
dataframe API") {
Seq(true, false).foreach { withSchemaEvolution =>
val sourceTable = "cat.ns1.source_table"
withTable(sourceTable) {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)""".stripMargin)
+ sql(s"CREATE TABLE $tableNameAsString (pk INT NOT NULL, salary INT,
dep STRING)")
val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ Row(1, 100, "hr"),
+ Row(2, 200, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 400, "marketing"),
+ Row(5, 500, "executive")
)
val targetSchema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
+ StructField("salary", IntegerType),
StructField("dep", StringType)
))
spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
@@ -5508,62 +5520,44 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
val sourceIdent = Identifier.of(Array("ns1"), "source_table")
val columns = Array(
Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- Column.create("dep", StringType))
+ Column.create("salary", IntegerType),
+ Column.create("dep", StringType),
+ Column.create("active", BooleanType))
val tableInfo = new TableInfo.Builder()
.withColumns(columns)
.withProperties(extraTableProps)
.build()
catalog.createTable(sourceIdent, tableInfo)
- val data = Seq(
- Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)),
"engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
-
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ sql(s"INSERT INTO $sourceTable VALUES (4, 150, 'dummy', true)," +
+ s"(5, 250, 'dummy', true), (6, 350, 'dummy', false)")
val mergeBuilder = spark.table(sourceTable)
.mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
.whenMatched()
- .updateAll()
+ .update(Map("dep" -> lit("software"), "active" ->
col("source_table.active")))
.whenNotMatched()
- .insertAll()
+ .insert(Map("pk" -> col("source_table.pk"), "salary" -> lit(0),
+ "dep" -> col("source_table.dep"), "active" ->
col("source_table.active")))
if (withSchemaEvolution) {
mergeBuilder.withSchemaEvolution().merge()
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(1, Row(10, Row(Seq(3, 4), Map("c" -> "d"), false)),
"sales"),
- Row(2, Row(20, Row(Seq(4, 5), Map("e" -> "f"), true)),
"engineering")))
+ Seq(
+ Row(1, 100, "hr", null),
+ Row(2, 200, "software", null),
+ Row(3, 300, "hr", null),
+ Row(4, 400, "software", true),
+ Row(5, 500, "software", true),
+ Row(6, 0, "dummy", false)))
} else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ assert(e.getMessage.contains("A column, variable, or function
parameter with name " +
+ "`active` cannot be resolved"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -5571,8 +5565,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
- test("merge schema evolution replace column with nested struct and " +
- "set explicit columns using dataframe API") {
+ test("merge schema evolution add column with nested struct and set explicit
columns " +
+ "using dataframe API") {
Seq(true, false).foreach { withSchemaEvolution =>
val sourceTable = "cat.ns1.source_table"
withTable(sourceTable) {
@@ -5605,7 +5599,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Column.create("s", StructType(Seq(
StructField("c1", IntegerType),
StructField("c2", StructType(Seq(
- // removed column 'a'
+ StructField("a", ArrayType(IntegerType)),
StructField("m", MapType(StringType, StringType)),
StructField("c3", BooleanType) // new column
)))
@@ -5618,14 +5612,15 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
catalog.createTable(sourceIdent, tableInfo)
val data = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)),
"engineering")
)
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("s", StructType(Seq(
StructField("c1", IntegerType),
StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
StructField("m", MapType(StringType, StringType)),
StructField("c3", BooleanType)
)))
@@ -5651,7 +5646,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
"s" -> struct(
col("source_table.s.c1").as("c1"),
struct(
- array(lit(-2)).as("a"),
+ col("source_table.s.c2.a").as("a"),
map(lit("g"), lit("h")).as("m"),
lit(true).as("c3")
).as("c2")
@@ -5663,7 +5658,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)), "hr"),
- Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)),
"engineering")))
+ Row(2, Row(20, Row(Seq(4, 5), Map("g" -> "h"), true)),
"engineering")))
} else {
val exception = intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
@@ -5677,7 +5672,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
- test("merge schema evolution replace column with nested struct and set all
columns " +
+ test("merge schema evolution add column with nested struct and set all
columns " +
"using dataframe API") {
Seq(true, false).foreach { withSchemaEvolution =>
val sourceTable = "cat.ns1.source_table"
@@ -5686,26 +5681,24 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
s"""CREATE TABLE $tableNameAsString (
|pk INT NOT NULL,
|s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
+ |dep STRING)""".stripMargin)
- val tableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
- StructField("dep", StringType)
- ))
- val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
- .coalesce(1).writeTo(tableNameAsString).append()
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
val sourceIdent = Identifier.of(Array("ns1"), "source_table")
val columns = Array(
@@ -5713,7 +5706,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Column.create("s", StructType(Seq(
StructField("c1", IntegerType),
StructField("c2", StructType(Seq(
- // missing column 'a'
+ StructField("a", ArrayType(IntegerType)),
StructField("m", MapType(StringType, StringType)),
StructField("c3", BooleanType) // new column
)))
@@ -5725,22 +5718,23 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.build()
catalog.createTable(sourceIdent, tableInfo)
- val sourceData = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ val data = Seq(
+ Row(1, Row(10, Row(Array(3, 4), Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Array(4, 5), Map("e" -> "f"), true)),
"engineering")
)
val sourceTableSchema = StructType(Seq(
StructField("pk", IntegerType, nullable = false),
StructField("s", StructType(Seq(
StructField("c1", IntegerType),
StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
StructField("m", MapType(StringType, StringType)),
StructField("c3", BooleanType)
)))
))),
StructField("dep", StringType)
))
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
.createOrReplaceTempView("source_temp")
sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
@@ -5748,105 +5742,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
val mergeBuilder = spark.table(sourceTable)
.mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
.whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
-
- if (withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
- }
-
- sql(s"DROP TABLE $tableNameAsString")
- }
- }
- }
-
- test("merge schema evolution replace column with nested struct and " +
- "update top level struct using dataframe API") {
- Seq(true, false).foreach { withSchemaEvolution =>
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
- |dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
-
- val tableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", ArrayType(IntegerType)),
- StructField("m", MapType(StringType, StringType))
- )))
- ))),
- StructField("dep", StringType)
- ))
- val targetData = Seq(
- Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
- .coalesce(1).writeTo(tableNameAsString).append()
-
- // Create source table
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- // missing column 'a'
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType) // new column
- )))
- ))),
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
-
- val sourceData = Seq(
- Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
- Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("m", MapType(StringType, StringType)),
- StructField("c3", BooleanType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
-
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
-
- val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
- .whenMatched()
- .update(Map("s" -> col("source_table.s")))
+ .updateAll()
.whenNotMatched()
.insertAll()
@@ -5854,15 +5750,13 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
mergeBuilder.withSchemaEvolution().merge()
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ Seq(Row(1, Row(10, Row(Seq(3, 4), Map("c" -> "d"), false)),
"sales"),
+ Row(2, Row(20, Row(Seq(4, 5), Map("e" -> "f"), true)),
"engineering")))
} else {
val exception = intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
assert(exception.getMessage.contains(
"Cannot write extra fields `c3` to the struct `s`.`c2`"))
}
@@ -5872,6 +5766,339 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
+ test("merge schema evolution replace column with nested struct and " +
+ "set explicit columns using dataframe API") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING)""".stripMargin)
+
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
+
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ // removed column 'a'
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType) // new column
+ )))
+ ))),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
+
+ val data = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
+
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+
+ val mergeBuilder = spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .update(Map(
+ "s.c1" -> lit(-1),
+ "s.c2.m" -> map(lit("k"), lit("v")),
+ "s.c2.a" -> array(lit(-1)),
+ "s.c2.c3" -> col("source_table.s.c2.c3")))
+ .whenNotMatched()
+ .insert(Map(
+ "pk" -> col("source_table.pk"),
+ "s" -> struct(
+ col("source_table.s.c1").as("c1"),
+ struct(
+ array(lit(-2)).as("a"),
+ map(lit("g"), lit("h")).as("m"),
+ lit(true).as("c3")
+ ).as("c2")
+ ),
+ "dep" -> col("source_table.dep")))
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"), false)),
"hr"),
+ Row(2, Row(20, Row(Seq(-2), Map("g" -> "h"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get == "FIELD_NOT_FOUND")
+ assert(exception.getMessage.contains("No such struct field `c3`
in `a`, `m`. "))
+ }
+ }
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+ }
+
+ test("merge schema evolution replace column with nested struct and set all
columns " +
+ "using dataframe API") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+
+ val tableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+
spark.createDataFrame(spark.sparkContext.parallelize(targetData), tableSchema)
+ .coalesce(1).writeTo(tableNameAsString).append()
+
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ // missing column 'a'
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType) // new column
+ )))
+ ))),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
+
+ val sourceData = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+
spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
+
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+
+ val mergeBuilder = spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
+ }
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+ }
+ }
+
+ test("merge schema evolution replace column with nested struct and " +
+ "update top level struct using dataframe API") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: ARRAY<INT>, m: MAP<STRING,
STRING>>>,
+ |dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
+
+ val tableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", ArrayType(IntegerType)),
+ StructField("m", MapType(StringType, StringType))
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val targetData = Seq(
+ Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
tableSchema)
+ .coalesce(1).writeTo(tableNameAsString).append()
+
+ // Create source table
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ // missing column 'a'
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType) // new column
+ )))
+ ))),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
+
+ val sourceData = Seq(
+ Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("m", MapType(StringType, StringType)),
+ StructField("c3", BooleanType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
+
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+
+ val mergeBuilder = spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .update(Map("s" -> col("source_table.s")))
+ .whenNotMatched()
+ .insertAll()
+
+ if (coerceNestedTypes) {
+ if (withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ assert(exception.getMessage.contains(
+ "Cannot write extra fields `c3` to the struct `s`.`c2`"))
+ }
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot find data for the output column `s`.`c2`.`a`"))
+ }
+
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
+ }
+ }
+ }
+
test("merge schema evolution should not evolve referencing new column " +
"via transform using dataframe API") {
Seq(true, false).foreach { withSchemaEvolution =>
@@ -5933,76 +6160,98 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in top-level struct using
dataframe API") {
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- // Target table has struct with 3 fields at top level
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING)""".stripMargin)
-
- val targetData = Seq(
- Row(0, Row(1, "a", true), "sales")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType),
- StructField("c3", BooleanType)
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ // Target table has struct with 3 fields at top level
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING)""".stripMargin)
- // Create source table with struct having only 2 fields (c1, c2) -
missing c3
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
+ val targetData = Seq(
+ Row(0, Row(1, "a", true), "sales")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))),
- StructField("dep", StringType)))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ // Create source table with struct having only 2 fields (c1, c2) -
missing c3
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))),
+ StructField("dep", StringType)))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
- spark.table(sourceTable)
- .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
- .whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
- .merge()
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
+ if (coerceNestedTypes) {
+ spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ spark.table(sourceTable)
+ .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+ .merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
+ assert(exception.getMessage.contains(
+ "Cannot write incompatible data for the table ``: " +
+ "Cannot find data for the output column `s`.`c3`."))
+ }
- sql(s"DROP TABLE $tableNameAsString")
+ sql(s"DROP TABLE $tableNameAsString")
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]