This is an automated email from the ASF dual-hosted git repository.
szehon-ho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7d7c623247d0 [SPARK-57197][SQL] Improve test coverage for merge into
schema evolution
7d7c623247d0 is described below
commit 7d7c623247d05107618f691812bfe89fb6d6fe2d
Author: Eric Yang <[email protected]>
AuthorDate: Wed Jun 3 12:53:49 2026 -0700
[SPARK-57197][SQL] Improve test coverage for merge into schema evolution
### What changes were proposed in this pull request?
This is a follow-up PR from the comment
https://github.com/apache/spark/pull/55329#issuecomment-4580763636 - to add
test coverage for MERGE INTO schema evolution:
- Nested field with a special-char name.
- Special-char column that already exists in the target.
### Why are the changes needed?
To increase test coverage of MERGE INTO schema evolution.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added test cases
### Was this patch authored or co-authored using generative AI tooling?
Yes. Claude Code
Closes #56242 from jiwen624/spark-56462-followup-test-coverage.
Authored-by: Eric Yang <[email protected]>
Signed-off-by: Szehon Ho <[email protected]>
---
.../MergeIntoSchemaEvolutionBasicTests.scala | 213 +++++++++++++++++++++
1 file changed, 213 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
index bb221a200b2d..23bc5028981e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
@@ -1303,4 +1303,217 @@ trait MergeIntoSchemaEvolutionBasicTests extends
MergeIntoSchemaEvolutionSuiteBa
))
)
}
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"source struct has extra nested field with special-char name:
$subFieldName")(
+ target = Seq(
+ """{ "pk": 1, "info": { "name": "Alice" }, "dep": "hr" }""",
+ """{ "pk": 2, "info": { "name": "Bob" }, "dep": "finance" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": null },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager"
}, "dep": "sales" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "Cannot write extra fields"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"special-char column already in target gets updated with type widening:
$colName")(
+ targetData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, ShortType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(1, 100, "hr", 1.toShort),
+ Row(2, 200, "software", 2.toShort),
+ Row(3, 300, "hr", 3.toShort)
+ )), schema)
+ },
+ sourceData = {
+ val schema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ ))
+ spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(2, 150, "finance", 50000),
+ Row(4, 400, "finance", 60000)
+ )), schema)
+ },
+ clauses = Seq(updateAll(), insertAll()),
+ expected = Seq(
+ (1, 100, "hr", 1),
+ (2, 150, "finance", 50000),
+ (3, 300, "hr", 3),
+ (4, 400, "finance", 60000)
+ ).toDF("pk", "salary", "dep", colName),
+ expectedSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType),
+ StructField(colName, IntegerType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"nested special-char field already in target gets updated with type
widening:" +
+ s" $subFieldName")(
+ target = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": 2 }, "dep":
"software" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }"""
+ ),
+ source = Seq(
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, ShortType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep":
"hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 },
"dep": "finance" }""",
+ s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 },
"dep": "hr" }""",
+ s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 },
"dep": "finance" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, IntegerType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains =
+ "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type
column or variable"
+ )
+ }
+
+ for (colName <- Seq("job.title", "job title")) {
+ testEvolution(
+ s"target has special-char column missing from source: $colName")(
+ targetData = Seq(
+ (1, 100, "hr", "engineer"),
+ (2, 200, "finance", "manager"),
+ (3, 300, "hr", "analyst")
+ ).toDF("pk", "salary", "dep", colName),
+ sourceData = Seq(
+ (2, 150, "sales"),
+ (4, 400, "engineering")
+ ).toDF("pk", "salary", "dep"),
+ clauses = Seq(updateAll(), insertAll()),
+ expected = Seq[(Int, Int, String, String)](
+ (1, 100, "hr", "engineer"),
+ (2, 150, "sales", "manager"),
+ (3, 300, "hr", "analyst"),
+ (4, 400, "engineering", null)
+ ).toDF("pk", "salary", "dep", colName),
+ expectedSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("salary", IntegerType, nullable = false),
+ StructField("dep", StringType),
+ StructField(colName, StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "cannot be resolved"
+ )
+ }
+
+ for (subFieldName <- Seq("job.title", "job title")) {
+ testNestedStructsEvolution(
+ s"target struct has nested special-char field missing from source:
$subFieldName")(
+ target = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": "manager" },
"dep": "finance" }"""
+ ),
+ source = Seq(
+ """{ "pk": 2, "info": { "name": "Bob2" }, "dep": "sales" }""",
+ """{ "pk": 3, "info": { "name": "Cathy" }, "dep": "engineering" }"""
+ ),
+ targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ clauses = Seq(updateAll(), insertAll()),
+ result = Seq(
+ s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer"
}, "dep": "hr" }""",
+ s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": "manager" },
"dep": "sales" }""",
+ s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": null },
"dep": "engineering" }"""
+ ),
+ resultSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("name", StringType),
+ StructField(subFieldName, StringType)
+ ))),
+ StructField("dep", StringType)
+ )),
+ expectErrorWithoutEvolutionContains = "Cannot find data for the output
column",
+ requiresNestedTypeCoercion = true
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]