This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 16a3f6c36b9d [SPARK-54595][SQL] Keep existing behavior of MERGE INTO
without SCHEMA EVOLUTION clause
16a3f6c36b9d is described below
commit 16a3f6c36b9dda0f009dd3e9e9a8fcb002fce9d8
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Dec 5 16:59:20 2025 -0800
[SPARK-54595][SQL] Keep existing behavior of MERGE INTO without SCHEMA
EVOLUTION clause
### What changes were proposed in this pull request?
Keep existing behavior for MERGE INTO without SCHEMA EVOLUTION clause for
UPDATE SET * and INSERT * as well as UPDATE struct or INSERT struct, to throw
exception if the source and target schemas are not exactly the same.
### Why are the changes needed?
As aokolnychyi tested this feature, he mentioned that as of Spark 4.1 the
behavior is changed for MERGE INTO but without SCHEMA EVOLUTION clause.
In particular:
- Source has less columns/nested fields than target => we fill with NULL or
DEFAULT for inserts, and existing value for Update. (though we disabled for
nested structs by default in
[[SPARK-54525](https://issues.apache.org/jira/browse/SPARK-54525))](https://github.com/apache/spark/pull/53229)
- Source has more columns/fields than target => we drop the extra fields.
Initially, I thought its a good improvement of MERGE INTO and is not
related to SCHEMA EVOLUTION exactly because the schema is not altered. But
Anton has a good point that it may be a surprise to some user. So it may be
better for now to be more conservative and keep the exact same behavior for
without SCHEMA EVOLUTION clause.
Note: this behavior is still enabled if SCHEMA EVOLUTION is specified, as
the user then is more explicit about the decision.
### Does this PR introduce _any_ user-facing change?
No, this keeps behavior exactly the same as 4.0 without SCHEMA EVOLUTION
clause.
### How was this patch tested?
Added a test and changed existing test output to expect the exception if
SCHEMA EVOLUTION is not specified.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53326 from szehon-ho/merge_restriction.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 74b6a931250a9aad5d37b49c33af79e4411e4ef4)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 10 +-
.../ResolveRowLevelCommandAssignments.scala | 2 +-
.../sql/connector/MergeIntoTableSuiteBase.scala | 1387 +++++++++-----------
.../execution/command/PlanResolutionSuite.scala | 98 +-
4 files changed, 711 insertions(+), 786 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 08c31939f161..23d32dd87db1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1736,9 +1736,8 @@ class Analyzer(
Assignment(key, sourceAttr)
}
} else {
- sourceTable.output.flatMap { sourceAttr =>
- findAttrInTarget(sourceAttr.name).map(
- targetAttr => Assignment(targetAttr, sourceAttr))
+ targetTable.output.map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
}
UpdateAction(
@@ -1775,9 +1774,8 @@ class Analyzer(
Assignment(key, sourceAttr)
}
} else {
- sourceTable.output.flatMap { sourceAttr =>
- findAttrInTarget(sourceAttr.name).map(
- targetAttr => Assignment(targetAttr, sourceAttr))
+ targetTable.output.map { attr =>
+ Assignment(attr, UnresolvedAttribute(Seq(attr.name)))
}
}
InsertAction(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index d1b8eab13191..bf1016ba8268 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -53,7 +53,7 @@ object ResolveRowLevelCommandAssignments extends
Rule[LogicalPlan] {
case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved &&
m.rewritable && !m.aligned &&
!m.needSchemaEvolution =>
validateStoreAssignmentPolicy()
- val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes
+ val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes &&
m.withSchemaEvolution
m.copy(
targetTable = cleanAttrMetadata(m.targetTable),
matchedActions = alignActions(m.targetTable.output, m.matchedActions,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index e59b4435c408..611daef36db0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -2404,7 +2404,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sourceDF.createOrReplaceTempView("source")
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- sql(s"""MERGE $schemaEvolutionClause
+ val mergeStmt = s"""MERGE $schemaEvolutionClause
|INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
@@ -2412,8 +2412,9 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| UPDATE SET *
|WHEN NOT MATCHED THEN
| INSERT *
- |""".stripMargin)
+ |""".stripMargin
if (withSchemaEvolution && schemaEvolutionEnabled) {
+ sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
@@ -2424,15 +2425,12 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(5, 250, "executive", true),
Row(6, 350, null, false)))
} else {
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 100, "hr"),
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 150, "marketing"),
- Row(5, 250, "executive"),
- Row(6, 350, null)))
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ assert(e.message.contains("A column, variable, or function parameter
with name " +
+ "`dep` cannot be resolved"))
}
sql(s"DROP TABLE $tableNameAsString")
}
@@ -2463,7 +2461,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sourceDF.createOrReplaceTempView("source")
val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
- sql(s"""MERGE $schemaEvolutionClause
+ val mergeStmt = s"""MERGE $schemaEvolutionClause
|INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
@@ -2471,8 +2469,9 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| UPDATE SET *
|WHEN NOT MATCHED THEN
| INSERT *
- |""".stripMargin)
+ |""".stripMargin
if (withSchemaEvolution && schemaEvolutionEnabled) {
+ sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
Seq(
@@ -2483,15 +2482,12 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(5, 250, "executive", true),
Row(6, 350, "unknown", false)))
} else {
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, 100, "hr"),
- Row(2, 200, "software"),
- Row(3, 300, "hr"),
- Row(4, 150, "marketing"),
- Row(5, 250, "executive"),
- Row(6, 350, "unknown")))
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(e.errorClass.get == "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ assert(e.getMessage.contains("A column, variable, or function
parameter with name " +
+ "`dep` cannot be resolved"))
}
sql(s"DROP TABLE $tableNameAsString")
}
@@ -3239,23 +3235,13 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
|WHEN NOT MATCHED THEN
| INSERT *
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
@@ -3335,30 +3321,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
|WHEN NOT MATCHED THEN
| INSERT *
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `s`.`c2`.`a`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3534,30 +3508,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `m`.`key`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"sales"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `m`.`key`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3612,30 +3574,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT (pk, m, dep) VALUES (src.pk, src.m, 'my_new_dep')
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
- Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
- Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `m`.`key`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Map(Row(10, 10, null) -> Row("c", "c", null)),
"hr"),
+ Row(1, Map(Row(10, null, true) -> Row("y", null, false)),
"my_old_dep"),
+ Row(2, Map(Row(20, null, false) -> Row("z", null, true)),
"my_new_dep")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `m`.`key`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3688,30 +3638,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "sales"),
- Row(2, Array(Row(20, null, false)), "engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct
`a`.`element`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "sales"),
+ Row(2, Array(Row(20, null, false)), "engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `a`.`element`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -3764,30 +3702,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT (pk, a, dep) VALUES (src.pk, src.a, 'my_new_dep')
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(Row(0, Array(Row(10, 10, null)), "hr"),
- Row(1, Array(Row(10, null, true)), "my_old_dep"),
- Row(2, Array(Row(20, null, false)), "my_new_dep")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct
`a`.`element`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(Row(0, Array(Row(10, 10, null)), "hr"),
+ Row(1, Array(Row(10, null, true)), "my_old_dep"),
+ Row(2, Array(Row(20, null, false)), "my_new_dep")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `a`.`element`.`c2`"))
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
@@ -4051,6 +3977,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ assert(exception.message.contains(" A column, variable, or function
parameter with name "
+ + "`bonus` cannot be resolved"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -4484,267 +4412,321 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in struct nested in array") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 3 fields (c1, c2, c3) in array
- createAndInitTable(
- s"""pk INT NOT NULL,
- |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ], "dep":
"sales" }
- |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ],
"dep": "sales" }"""
- .stripMargin)
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields (c1, c2, c3) in array
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |a ARRAY<STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "a": [ { "c1": 1, "c2": "a", "c3": true } ],
"dep": "sales" }
+ |{ "pk": 1, "a": [ { "c1": 2, "c2": "b", "c3": false } ],
"dep": "sales" }"""
+ .stripMargin)
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("a", ArrayType(
- StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType))))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Array(Row(10, "c")), "hr"),
- Row(2, Array(Row(30, "e")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("a", ArrayType(
+ StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Array(Row(10, "c")), "hr"),
+ Row(2, Array(Row(30, "e")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes) {
- sql(mergeStmt)
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Array(Row(1, "a", true)), "sales"),
- Row(1, Array(Row(10, "c", null)), "hr"),
- Row(2, Array(Row(30, "e", null)), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Array(Row(1, "a", true)), "sales"),
+ Row(1, Array(Row(10, "c", null)), "hr"),
+ Row(2, Array(Row(30, "e", null)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `a`.`element`.`c3`."))
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
test("merge into with source missing fields in struct nested in map key") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 2 fields in map key
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map key
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType),
StructField("c2", BooleanType))),
+ StructType(Seq(StructField("c3", StringType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
+
+ val targetData = Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(20, false) -> Row("y")), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
+
+ // Source table has struct with only 1 field (c1) in map key -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType),
StructField("c2", BooleanType))),
+ StructType(Seq(StructField("c1", IntegerType))), // missing c2
StructType(Seq(StructField("c3", StringType))))),
StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
- val targetData = Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(20, false) -> Row("y")), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- // Source table has struct with only 1 field (c1) in map key -
missing c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))), // missing c2
- StructType(Seq(StructField("c3", StringType))))),
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes) {
- sql(mergeStmt)
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10, true) -> Row("x")), "hr"),
- Row(1, Map(Row(10, null) -> Row("z")), "sales"),
- Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10, true) -> Row("x")), "hr"),
+ Row(1, Map(Row(10, null) -> Row("z")), "sales"),
+ Row(2, Map(Row(20, null) -> Row("w")), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `m`.`key`.`c2`."))
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
test("merge into with source missing fields in struct nested in map value") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 2 fields in map value
- val targetSchema =
- StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType),
StructField("c2", BooleanType))))),
- StructField("dep", StringType)))
- createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
-
- val targetData = Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(20) -> Row("y", false)), "sales"))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
-
- // Source table has struct with only 1 field (c1) in map value -
missing c2
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("m", MapType(
- StructType(Seq(StructField("c1", IntegerType))),
- StructType(Seq(StructField("c1", StringType))))), // missing c2
- StructField("dep", StringType)))
- val sourceData = Seq(
- Row(1, Map(Row(10) -> Row("z")), "sales"),
- Row(2, Map(Row(20) -> Row("w")), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 2 fields in map value
+ val targetSchema =
+ StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))),
+ StructType(Seq(StructField("c1", StringType),
StructField("c2", BooleanType))))),
+ StructField("dep", StringType)))
+ createTable(CatalogV2Util.structTypeToV2Columns(targetSchema))
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val targetData = Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(20) -> Row("y", false)), "sales"))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- if (coerceNestedTypes) {
- sql(mergeStmt)
- // Missing field c2 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Map(Row(10) -> Row("x", true)), "hr"),
- Row(1, Map(Row(10) -> Row("z", null)), "sales"),
- Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ // Source table has struct with only 1 field (c1) in map value -
missing c2
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("m", MapType(
+ StructType(Seq(StructField("c1", IntegerType))),
+ StructType(Seq(StructField("c1", StringType))))), // missing c2
+ StructField("dep", StringType)))
+ val sourceData = Seq(
+ Row(1, Map(Row(10) -> Row("z")), "sales"),
+ Row(2, Map(Row(20) -> Row("w")), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(sourceData),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Missing field c2 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Map(Row(10) -> Row("x", true)), "hr"),
+ Row(1, Map(Row(10) -> Row("z", null)), "sales"),
+ Row(2, Map(Row(20) -> Row("w", null)), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `m`.`value`.`c2`."))
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
test("merge into with source missing fields in top-level struct") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has struct with 3 fields at top level
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has struct with 3 fields at top level
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": "a", "c3": true }, "dep":
"sales"}""")
- // Source table has struct with only 2 fields (c1, c2) - missing c3
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- StructField("dep", StringType)))
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ // Source table has struct with only 2 fields (c1, c2) - missing c3
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t
- |USING source src
- |ON t.pk = src.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes) {
- sql(mergeStmt)
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c3`."))
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
+ }
+ }
+ }
+ }
+
+ test("merge into with source missing top-level column") {
+ Seq(true, false).foreach { withSchemaEvolution =>
+ withTempView("source") {
+ // Target table has 3 columns: pk, salary, dep
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |salary INT,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "salary": 100, "dep": "sales" }
+ |{ "pk": 1, "salary": 200, "dep": "hr" }"""
+ .stripMargin)
+
+ // Source table has only 2 columns: pk, dep (missing salary)
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("dep", StringType)))
+ val data = Seq(
+ Row(1, "engineering"),
+ Row(2, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
+
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t
+ |USING source src
+ |ON t.pk = src.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
+
+ if (withSchemaEvolution) {
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, 100, "sales"),
+ Row(1, 200, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "UNRESOLVED_COLUMN.WITH_SUGGESTION")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
@@ -4884,70 +4866,69 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge with with null struct with missing nested field") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- // Target table has nested struct with fields c1 and c2
- createAndInitTable(
- s"""pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
- |dep STRING""".stripMargin,
- """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
- |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
- .stripMargin)
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ // Target table has nested struct with fields c1 and c2
+ createAndInitTable(
+ s"""pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING""".stripMargin,
+ """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } },
"dep": "sales" }
+ |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } },
"dep": "hr" }"""
+ .stripMargin)
- // Source table has null for the nested struct
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
- ))),
- StructField("dep", StringType)
- ))
+ // Source table has null for the nested struct
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- // Without coercion, the merge should fail due to missing field
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c2`.`b`."))
}
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
@@ -4998,37 +4979,21 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- // extra nested field is added
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- // extra nested field is not added
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- sql(mergeStmt)
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot write extra fields `c` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ // extra nested field is added
+ sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x", null)), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
} else {
- // Without source struct coercion, the merge should fail
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c2`.`b`."))
}
}
}
@@ -5097,82 +5062,83 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge with null struct using default value") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- withTempView("source") {
- sql(
- s"""CREATE TABLE $tableNameAsString (
- | pk INT NOT NULL,
- | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
- | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
- | dep STRING)
- |PARTITIONED BY (dep)
- |""".stripMargin)
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ withTempView("source") {
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ | pk INT NOT NULL,
+ | s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>> DEFAULT
+ | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b',
'default')),
+ | dep STRING)
+ |PARTITIONED BY (dep)
+ |""".stripMargin)
- val initialSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- val initialData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
- .writeTo(tableNameAsString).append()
+ val initialSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ val initialData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(initialData),
initialSchema)
+ .writeTo(tableNameAsString).append()
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- )))
- ))),
- StructField("dep", StringType)
- ))
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source")
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source")
- val mergeStmt =
- s"""MERGE INTO $tableNameAsString t USING source
- |ON t.pk = source.pk
- |WHEN MATCHED THEN
- | UPDATE SET *
- |WHEN NOT MATCHED THEN
- | INSERT *
- |""".stripMargin
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt =
+ s"""MERGE $schemaEvolutionClause INTO $tableNameAsString t USING
source
+ |ON t.pk = source.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET *
+ |WHEN NOT MATCHED THEN
+ | INSERT *
+ |""".stripMargin
- if (coerceNestedTypes) {
- sql(mergeStmt)
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
+ if (coerceNestedTypes && withSchemaEvolution) {
sql(mergeStmt)
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ sql(mergeStmt)
+ }
+ assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `s`.`c2`.`b`"))
}
+ sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
- sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
}
@@ -5243,7 +5209,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in nested struct") {
- Seq(true, false).foreach { nestedTypeCoercion =>
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { nestedTypeCoercion =>
withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key
-> nestedTypeCoercion.toString) {
withTempView("source") {
@@ -5275,7 +5242,8 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.createOrReplaceTempView("source")
// Missing field b should be filled with NULL
- val mergeStmt = s"""MERGE INTO $tableNameAsString t
+ val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA
EVOLUTION" else ""
+ val mergeStmt = s"""MERGE $schemaEvolutionClause INTO
$tableNameAsString t
|USING source src
|ON t.pk = src.pk
|WHEN MATCHED THEN
@@ -5284,7 +5252,7 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
| INSERT *
|""".stripMargin
- if (nestedTypeCoercion) {
+ if (nestedTypeCoercion && withSchemaEvolution) {
sql(mergeStmt)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
@@ -5292,16 +5260,17 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(1, Row(10, Row(20, null)), "sales"),
Row(2, Row(20, Row(30, null)), "engineering")))
} else {
- val exception = intercept[Exception] {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
sql(mergeStmt)
}
- assert(exception.getMessage.contains(
- """Cannot write incompatible data for the table
``""".stripMargin))
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
}
sql(s"DROP TABLE IF EXISTS $tableNameAsString")
}
}
+ }
}
test("merge with named_struct missing non-nullable field") {
@@ -5926,30 +5895,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)),
"sales"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `s`.`c2`.`a`"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -6038,30 +5995,18 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
- Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
- } else {
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write extra fields `c3` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"),
+ Row(2, Row(20, Row(null, Map("e" -> "f"), true)),
"engineering")))
} else {
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot find data for the output column `s`.`c2`.`a`"))
}
sql(s"DROP TABLE $tableNameAsString")
@@ -6132,198 +6077,190 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
test("merge into with source missing fields in top-level struct using
dataframe API") {
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- // Target table has struct with 3 fields at top level
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
- |dep STRING)""".stripMargin)
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ // Target table has struct with 3 fields at top level
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRING, c3: BOOLEAN>,
+ |dep STRING)""".stripMargin)
- val targetData = Seq(
- Row(0, Row(1, "a", true), "sales")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType),
- StructField("c3", BooleanType)
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ val targetData = Seq(
+ Row(0, Row(1, "a", true), "sales")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Create source table with struct having only 2 fields (c1, c2) -
missing c3
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))), // missing c3 field
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
+ // Create source table with struct having only 2 fields (c1, c2) -
missing c3
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))), // missing c3 field
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- val data = Seq(
- Row(1, Row(10, "b"), "hr"),
- Row(2, Row(20, "c"), "engineering")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StringType)))),
- StructField("dep", StringType)))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ val data = Seq(
+ Row(1, Row(10, "b"), "hr"),
+ Row(2, Row(20, "c"), "engineering")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))),
+ StructField("dep", StringType)))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
- if (coerceNestedTypes) {
- spark.table(sourceTable)
+ val mergeBuilder = spark.table(sourceTable)
.mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
- .merge()
- // Missing field c3 should be filled with NULL
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, "a", true), "sales"),
- Row(1, Row(10, "b", null), "hr"),
- Row(2, Row(20, "c", null), "engineering")))
- } else {
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- spark.table(sourceTable)
- .mergeInto(tableNameAsString, $"source_table.pk" ===
col(tableNameAsString + ".pk"))
- .whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
- .merge()
+ if (coerceNestedTypes && withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+
+ // Missing field c3 should be filled with NULL
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, "a", true), "sales"),
+ Row(1, Row(10, "b", null), "hr"),
+ Row(2, Row(20, "c", null), "engineering")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c3`."))
- }
- sql(s"DROP TABLE $tableNameAsString")
+ sql(s"DROP TABLE $tableNameAsString")
+ }
}
}
}
}
test("merge with null struct with missing nested field using dataframe API")
{
- Seq(true, false).foreach { coerceNestedTypes =>
- withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
- coerceNestedTypes.toString) {
- val sourceTable = "cat.ns1.source_table"
- withTable(sourceTable) {
- // Target table has nested struct with fields c1 and c2
- sql(
- s"""CREATE TABLE $tableNameAsString (
- |pk INT NOT NULL,
- |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
- |dep STRING)""".stripMargin)
-
- val targetData = Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, Row(2, Row(20, "y")), "hr")
- )
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", StringType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
- .writeTo(tableNameAsString).append()
+ Seq(true, false).foreach { withSchemaEvolution =>
+ Seq(true, false).foreach { coerceNestedTypes =>
+ withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key ->
+ coerceNestedTypes.toString) {
+ val sourceTable = "cat.ns1.source_table"
+ withTable(sourceTable) {
+ // Target table has nested struct with fields c1 and c2
+ sql(
+ s"""CREATE TABLE $tableNameAsString (
+ |pk INT NOT NULL,
+ |s STRUCT<c1: INT, c2: STRUCT<a: INT, b: STRING>>,
+ |dep STRING)""".stripMargin)
- // Create source table with missing nested field 'b'
- val sourceIdent = Identifier.of(Array("ns1"), "source_table")
- val columns = Array(
- Column.create("pk", IntegerType, false),
- Column.create("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- // missing field 'b'
- )))
- ))),
- Column.create("dep", StringType))
- val tableInfo = new TableInfo.Builder()
- .withColumns(columns)
- .withProperties(extraTableProps)
- .build()
- catalog.createTable(sourceIdent, tableInfo)
+ val targetData = Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, Row(2, Row(20, "y")), "hr")
+ )
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StringType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(targetData),
targetSchema)
+ .writeTo(tableNameAsString).append()
- // Source table has null for the nested struct
- val data = Seq(
- Row(1, null, "engineering"),
- Row(2, null, "finance")
- )
- val sourceTableSchema = StructType(Seq(
- StructField("pk", IntegerType),
- StructField("s", StructType(Seq(
- StructField("c1", IntegerType),
- StructField("c2", StructType(Seq(
- StructField("a", IntegerType)
- )))
- ))),
- StructField("dep", StringType)
- ))
- spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
- .createOrReplaceTempView("source_temp")
+ // Create source table with missing nested field 'b'
+ val sourceIdent = Identifier.of(Array("ns1"), "source_table")
+ val columns = Array(
+ Column.create("pk", IntegerType, false),
+ Column.create("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ // missing field 'b'
+ )))
+ ))),
+ Column.create("dep", StringType))
+ val tableInfo = new TableInfo.Builder()
+ .withColumns(columns)
+ .withProperties(extraTableProps)
+ .build()
+ catalog.createTable(sourceIdent, tableInfo)
- sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
- val mergeBuilder = spark.table(sourceTable)
- .mergeInto(tableNameAsString,
- $"source_table.pk" === col(tableNameAsString + ".pk"))
- .whenMatched()
- .updateAll()
- .whenNotMatched()
- .insertAll()
+ // Source table has null for the nested struct
+ val data = Seq(
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")
+ )
+ val sourceTableSchema = StructType(Seq(
+ StructField("pk", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))),
+ StructField("dep", StringType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(data),
sourceTableSchema)
+ .createOrReplaceTempView("source_temp")
- if (coerceNestedTypes) {
- mergeBuilder.merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x")), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- // Without coercion, the merge should fail due to missing field
- val exception = intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
+ sql(s"INSERT INTO $sourceTable SELECT * FROM source_temp")
+ val mergeBuilder = spark.table(sourceTable)
+ .mergeInto(tableNameAsString,
+ $"source_table.pk" === col(tableNameAsString + ".pk"))
+ .whenMatched()
+ .updateAll()
+ .whenNotMatched()
+ .insertAll()
+
+ if (coerceNestedTypes && withSchemaEvolution) {
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x")), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
+ } else {
+ val exception =
intercept[org.apache.spark.sql.AnalysisException] {
+ mergeBuilder.merge()
+ }
+ assert(exception.errorClass.get ==
+ "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
}
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c2`.`b`."))
- }
- sql(s"DROP TABLE $tableNameAsString")
+ sql(s"DROP TABLE $tableNameAsString")
+ }
}
}
}
@@ -6409,37 +6346,21 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
.whenNotMatched()
.insertAll()
- if (coerceNestedTypes) {
- if (withSchemaEvolution) {
- // extra nested field is added
- mergeBuilder.withSchemaEvolution().merge()
- checkAnswer(
- sql(s"SELECT * FROM $tableNameAsString"),
- Seq(
- Row(0, Row(1, Row(10, "x", null)), "sales"),
- Row(1, null, "engineering"),
- Row(2, null, "finance")))
- } else {
- // extra nested field is not added
- val exception =
intercept[org.apache.spark.sql.AnalysisException] {
- mergeBuilder.merge()
- }
- assert(exception.errorClass.get ==
- "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot write extra fields `c` to the struct `s`.`c2`"))
- }
+ if (coerceNestedTypes && withSchemaEvolution) {
+ // extra nested field is added
+ mergeBuilder.withSchemaEvolution().merge()
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(0, Row(1, Row(10, "x", null)), "sales"),
+ Row(1, null, "engineering"),
+ Row(2, null, "finance")))
} else {
- // Without source struct coercion, the merge should fail
val exception =
intercept[org.apache.spark.sql.AnalysisException] {
mergeBuilder.merge()
}
assert(exception.errorClass.get ==
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA")
- assert(exception.getMessage.contains(
- "Cannot write incompatible data for the table ``: " +
- "Cannot find data for the output column `s`.`c2`.`b`."))
}
sql(s"DROP TABLE $tableNameAsString")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 18042bf73adf..8e5ee1644f9c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1633,10 +1633,10 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
if (starInUpdate) {
assert(updateAssigns.size == 2)
-
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
-
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
-
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
+
assert(updateAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(updateAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
+
assert(updateAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
+
assert(updateAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
} else {
assert(updateAssigns.size == 1)
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ts))
@@ -1656,10 +1656,10 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
if (starInInsert) {
assert(insertAssigns.size == 2)
-
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ts))
-
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(ss))
-
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(si))
+
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
+
assert(insertAssigns(0).value.asInstanceOf[AttributeReference].sameRef(si))
+
assert(insertAssigns(1).key.asInstanceOf[AttributeReference].sameRef(ts))
+
assert(insertAssigns(1).value.asInstanceOf[AttributeReference].sameRef(ss))
} else {
assert(insertAssigns.size == 2)
assert(insertAssigns(0).key.asInstanceOf[AttributeReference].sameRef(ti))
@@ -1720,8 +1720,40 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
}
+ // star with schema evolution
+ val sqlStarSchemaEvolution =
+ s"""
+ |MERGE WITH SCHEMA EVOLUTION INTO $target AS target
+ |USING $source AS source
+ |ON target.i = source.i
+ |WHEN MATCHED AND (target.s='delete') THEN DELETE
+ |WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
+ |WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
+ """.stripMargin
+ parseAndResolve(sqlStarSchemaEvolution) match {
+ case MergeIntoTable(
+ SubqueryAlias(AliasIdentifier("target", Seq()),
AsDataSourceV2Relation(target)),
+ SubqueryAlias(AliasIdentifier("source", Seq()),
AsDataSourceV2Relation(source)),
+ mergeCondition,
+ Seq(DeleteAction(Some(EqualTo(dl: AttributeReference,
StringLiteral("delete")))),
+ UpdateAction(Some(EqualTo(ul: AttributeReference,
+ StringLiteral("update"))), updateAssigns, _)),
+ Seq(InsertAction(Some(EqualTo(il: AttributeReference,
StringLiteral("insert"))),
+ insertAssigns)),
+ Seq(),
+ withSchemaEvolution) =>
+ checkMergeConditionResolution(target, source, mergeCondition)
+ checkMatchedClausesResolution(target, source, Some(dl), Some(ul),
updateAssigns,
+ starInUpdate = true)
+ checkNotMatchedClausesResolution(target, source, Some(il),
insertAssigns,
+ starInInsert = true)
+ assert(withSchemaEvolution === true)
+
+ case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
+ }
+
// star
- val sql2 =
+ val sqlStarWithoutSchemaEvolution =
s"""
|MERGE INTO $target AS target
|USING $source AS source
@@ -1730,7 +1762,7 @@ class PlanResolutionSuite extends SharedSparkSession with
AnalysisTest {
|WHEN MATCHED AND (target.s='update') THEN UPDATE SET *
|WHEN NOT MATCHED AND (source.s='insert') THEN INSERT *
""".stripMargin
- parseAndResolve(sql2) match {
+ parseAndResolve(sqlStarWithoutSchemaEvolution) match {
case MergeIntoTable(
SubqueryAlias(AliasIdentifier("target", Seq()),
AsDataSourceV2Relation(target)),
SubqueryAlias(AliasIdentifier("source", Seq()),
AsDataSourceV2Relation(source)),
@@ -2336,24 +2368,11 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
|USING testcat.tab2
|ON 1 = 1
|WHEN MATCHED THEN UPDATE SET *""".stripMargin
- val parsed2 = parseAndResolve(sql2)
- parsed2 match {
- case MergeIntoTable(
- AsDataSourceV2Relation(target),
- AsDataSourceV2Relation(source),
- EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
- Seq(UpdateAction(None, updateAssigns, _)), // Matched actions
- Seq(), // Not matched actions
- Seq(), // Not matched by source actions
- withSchemaEvolution) =>
- val ti = target.output.find(_.name == "i").get
- val si = source.output.find(_.name == "i").get
- assert(updateAssigns.size == 1)
-
assert(updateAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(updateAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
- assert(withSchemaEvolution === false)
- case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
- }
+ checkError(
+ exception = intercept[AnalysisException](parseAndResolve(sql2)),
+ condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
+ context = ExpectedContext(fragment = sql2, start = 0, stop = 80))
// INSERT * with incompatible schema between source and target tables.
val sql3 =
@@ -2361,24 +2380,11 @@ class PlanResolutionSuite extends SharedSparkSession
with AnalysisTest {
|USING testcat.tab2
|ON 1 = 1
|WHEN NOT MATCHED THEN INSERT *""".stripMargin
- val parsed3 = parseAndResolve(sql3)
- parsed3 match {
- case MergeIntoTable(
- AsDataSourceV2Relation(target),
- AsDataSourceV2Relation(source),
- EqualTo(IntegerLiteral(1), IntegerLiteral(1)),
- Seq(), // Matched action
- Seq(InsertAction(None, insertAssigns)), // Not matched actions
- Seq(), // Not matched by source actions
- withSchemaEvolution) =>
- val ti = target.output.find(_.name == "i").get
- val si = source.output.find(_.name == "i").get
- assert(insertAssigns.size == 1)
-
assert(insertAssigns.head.key.asInstanceOf[AttributeReference].sameRef(ti))
-
assert(insertAssigns.head.value.asInstanceOf[AttributeReference].sameRef(si))
- assert(withSchemaEvolution === false)
- case other => fail("Expect MergeIntoTable, but got:\n" +
other.treeString)
- }
+ checkError(
+ exception = intercept[AnalysisException](parseAndResolve(sql3)),
+ condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ parameters = Map("objectName" -> "`s`", "proposal" -> "`i`, `x`"),
+ context = ExpectedContext(fragment = sql3, start = 0, stop = 80))
val sql4 =
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]