This is an automated email from the ASF dual-hosted git repository.

szehon-ho pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 4b81c4f0cabc [SPARK-57197][SQL] Improve test coverage for merge into 
schema evolution
4b81c4f0cabc is described below

commit 4b81c4f0cabc1cdf920f6dea5fd7370aab997dfd
Author: Eric Yang <[email protected]>
AuthorDate: Wed Jun 3 12:53:49 2026 -0700

    [SPARK-57197][SQL] Improve test coverage for merge into schema evolution
    
    ### What changes were proposed in this pull request?
    This is a follow-up PR from the comment 
https://github.com/apache/spark/pull/55329#issuecomment-4580763636 - to add 
test coverage for MERGE INTO schema evolution:
    - Nested field with a special-char name.
    - Special-char column that already exists in the target.
    
    ### Why are the changes needed?
    To increase test coverage of MERGE INTO schema evolution.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added test cases
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes. Claude Code
    
    Closes #56242 from jiwen624/spark-56462-followup-test-coverage.
    
    Authored-by: Eric Yang <[email protected]>
    Signed-off-by: Szehon Ho <[email protected]>
    (cherry picked from commit 7d7c623247d05107618f691812bfe89fb6d6fe2d)
    Signed-off-by: Szehon Ho <[email protected]>
---
 .../MergeIntoSchemaEvolutionBasicTests.scala       | 213 +++++++++++++++++++++
 1 file changed, 213 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
index bb221a200b2d..23bc5028981e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionBasicTests.scala
@@ -1303,4 +1303,217 @@ trait MergeIntoSchemaEvolutionBasicTests extends 
MergeIntoSchemaEvolutionSuiteBa
       ))
     )
   }
+
+  for (subFieldName <- Seq("job.title", "job title")) {
+    testNestedStructsEvolution(
+      s"source struct has extra nested field with special-char name: 
$subFieldName")(
+      target = Seq(
+        """{ "pk": 1, "info": { "name": "Alice" }, "dep": "hr" }""",
+        """{ "pk": 2, "info": { "name": "Bob" }, "dep": "finance" }"""
+      ),
+      source = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer" 
}, "dep": "hr" }""",
+        s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager" 
}, "dep": "sales" }"""
+      ),
+      targetSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      sourceSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      clauses = Seq(updateAll(), insertAll()),
+      result = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice2", "$subFieldName": "engineer" 
}, "dep": "hr" }""",
+        s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": null }, 
"dep": "finance" }""",
+        s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": "manager" 
}, "dep": "sales" }"""
+      ),
+      resultSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      expectErrorWithoutEvolutionContains = "Cannot write extra fields"
+    )
+  }
+
+  for (colName <- Seq("job.title", "job title")) {
+    testEvolution(
+      s"special-char column already in target gets updated with type widening: 
$colName")(
+      targetData = {
+        val schema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("salary", IntegerType),
+          StructField("dep", StringType),
+          StructField(colName, ShortType)
+        ))
+        spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+          Row(1, 100, "hr", 1.toShort),
+          Row(2, 200, "software", 2.toShort),
+          Row(3, 300, "hr", 3.toShort)
+        )), schema)
+      },
+      sourceData = {
+        val schema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("salary", IntegerType),
+          StructField("dep", StringType),
+          StructField(colName, IntegerType)
+        ))
+        spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+          Row(2, 150, "finance", 50000),
+          Row(4, 400, "finance", 60000)
+        )), schema)
+      },
+      clauses = Seq(updateAll(), insertAll()),
+      expected = Seq(
+        (1, 100, "hr", 1),
+        (2, 150, "finance", 50000),
+        (3, 300, "hr", 3),
+        (4, 400, "finance", 60000)
+      ).toDF("pk", "salary", "dep", colName),
+      expectedSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType),
+        StructField(colName, IntegerType)
+      )),
+      expectErrorWithoutEvolutionContains =
+        "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type 
column or variable"
+    )
+  }
+
+  for (subFieldName <- Seq("job.title", "job title")) {
+    testNestedStructsEvolution(
+      s"nested special-char field already in target gets updated with type 
widening:" +
+        s" $subFieldName")(
+      target = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep": 
"hr" }""",
+        s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": 2 }, "dep": 
"software" }""",
+        s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 }, 
"dep": "hr" }"""
+      ),
+      source = Seq(
+        s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 }, 
"dep": "finance" }""",
+        s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 }, 
"dep": "finance" }"""
+      ),
+      targetSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, ShortType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      sourceSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, IntegerType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      clauses = Seq(updateAll(), insertAll()),
+      result = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": 1 }, "dep": 
"hr" }""",
+        s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": 50000 }, 
"dep": "finance" }""",
+        s"""{ "pk": 3, "info": { "name": "Charlie", "$subFieldName": 3 }, 
"dep": "hr" }""",
+        s"""{ "pk": 4, "info": { "name": "Diana", "$subFieldName": 60000 }, 
"dep": "finance" }"""
+      ),
+      resultSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, IntegerType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      expectErrorWithoutEvolutionContains =
+        "Fail to assign a value of \"INT\" type to the \"SMALLINT\" type 
column or variable"
+    )
+  }
+
+  for (colName <- Seq("job.title", "job title")) {
+    testEvolution(
+      s"target has special-char column missing from source: $colName")(
+      targetData = Seq(
+        (1, 100, "hr", "engineer"),
+        (2, 200, "finance", "manager"),
+        (3, 300, "hr", "analyst")
+      ).toDF("pk", "salary", "dep", colName),
+      sourceData = Seq(
+        (2, 150, "sales"),
+        (4, 400, "engineering")
+      ).toDF("pk", "salary", "dep"),
+      clauses = Seq(updateAll(), insertAll()),
+      expected = Seq[(Int, Int, String, String)](
+        (1, 100, "hr", "engineer"),
+        (2, 150, "sales", "manager"),
+        (3, 300, "hr", "analyst"),
+        (4, 400, "engineering", null)
+      ).toDF("pk", "salary", "dep", colName),
+      expectedSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("salary", IntegerType, nullable = false),
+        StructField("dep", StringType),
+        StructField(colName, StringType)
+      )),
+      expectErrorWithoutEvolutionContains = "cannot be resolved"
+    )
+  }
+
+  for (subFieldName <- Seq("job.title", "job title")) {
+    testNestedStructsEvolution(
+      s"target struct has nested special-char field missing from source: 
$subFieldName")(
+      target = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer" 
}, "dep": "hr" }""",
+        s"""{ "pk": 2, "info": { "name": "Bob", "$subFieldName": "manager" }, 
"dep": "finance" }"""
+      ),
+      source = Seq(
+        """{ "pk": 2, "info": { "name": "Bob2" }, "dep": "sales" }""",
+        """{ "pk": 3, "info": { "name": "Cathy" }, "dep": "engineering" }"""
+      ),
+      targetSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      sourceSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      clauses = Seq(updateAll(), insertAll()),
+      result = Seq(
+        s"""{ "pk": 1, "info": { "name": "Alice", "$subFieldName": "engineer" 
}, "dep": "hr" }""",
+        s"""{ "pk": 2, "info": { "name": "Bob2", "$subFieldName": "manager" }, 
"dep": "sales" }""",
+        s"""{ "pk": 3, "info": { "name": "Cathy", "$subFieldName": null }, 
"dep": "engineering" }"""
+      ),
+      resultSchema = StructType(Seq(
+        StructField("pk", IntegerType, nullable = false),
+        StructField("info", StructType(Seq(
+          StructField("name", StringType),
+          StructField(subFieldName, StringType)
+        ))),
+        StructField("dep", StringType)
+      )),
+      expectErrorWithoutEvolutionContains = "Cannot find data for the output 
column",
+      requiresNestedTypeCoercion = true
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to