This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 83c313caa5d2 [MINOR] Support schema evolution with nested data type
(#17531)
83c313caa5d2 is described below
commit 83c313caa5d238da5e7c63d70a50a1f2343d1cbb
Author: Lin Liu <[email protected]>
AuthorDate: Mon Feb 9 19:50:02 2026 -0800
[MINOR] Support schema evolution with nested data type (#17531)
---
.../apache/spark/sql/HoodieInternalRowUtils.scala | 6 +-
.../TestComplexTypeSchemaEvolution.scala | 471 +++++++++++++++++++++
2 files changed, 474 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index f3eb2214ea22..ddd277c8d423 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -210,15 +210,15 @@ object HoodieInternalRowUtils {
case (newStructType: StructType, prevStructType: StructType) =>
val writer = genUnsafeStructWriter(prevStructType, newStructType,
renamedColumnsMap, fieldNameStack)
- val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
- val rowUpdater = new RowUpdater(newRow)
-
(fieldUpdater, ordinal, value) => {
// Here new row is built in 2 stages:
// - First, we pass mutable row (used as buffer/scratchpad)
created above wrapped into [[RowUpdater]]
// into generated row-writer
// - Upon returning from row-writer, we call back into parent
row's [[fieldUpdater]] to set returned
// row as a value in it
+ // NOTE: Create a new row for each element to avoid reusing the same
row object across array elements
+ val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
+ val rowUpdater = new RowUpdater(newRow)
writer(rowUpdater, value)
fieldUpdater.set(ordinal, newRow)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestComplexTypeSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestComplexTypeSchemaEvolution.scala
new file mode 100644
index 000000000000..ae4481229297
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestComplexTypeSchemaEvolution.scala
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieStorageConfig
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger,
ScalaAssertionSupport}
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, MapType,
StringType, StructType}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util.stream.Stream
+
+/**
+ * Test to verify schema evolution for complex data types (Array[Struct],
Nested Structs, Array[Map[Struct]])
+ * with record mergers. These tests verify that schema evolution of complex
nested fields
+ * works correctly and doesn't cause data corruption when using record mergers.
+ */
+class TestComplexTypeSchemaEvolution extends HoodieSparkClientTestBase with
ScalaAssertionSupport {
+ var spark: SparkSession = _
+
+ @BeforeEach override def setUp(): Unit = {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initHoodieStorage()
+ }
+
+ @AfterEach override def tearDown(): Unit = {
+ cleanupSparkContexts()
+ cleanupFileSystem()
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testParameters"))
+ def testArrayStructSchemaEvolutionWithRecordMerger(tableType: String,
logFormat: String): Unit = {
+ val tablePath = basePath +
s"/array_struct_evolution_${tableType}_${logFormat}"
+ val tableName = s"array_struct_evolution_${tableType}_${logFormat}"
+
+ // ==========================================================
+ // STEP 1: Initial schema (no evolution yet)
+ // ==========================================================
+ val schemaV1 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("items", ArrayType(new StructType()
+ .add("a", IntegerType, nullable = true)
+ .add("b", IntegerType, nullable = true)
+ .add("c", IntegerType, nullable = true)
+ .add("d", IntegerType, nullable = true)
+ ), nullable = true)
+ val row1Items = Seq(
+ Row(1, 11, 111, 1111),
+ Row(2, 22, 222, 2222),
+ Row(3, 33, 333, 3333)
+ )
+ val row2Items = Seq(
+ Row(10, 77, 777, 7777),
+ Row(20, 88, 888, 8888),
+ Row(30, 99, 999, 9999)
+ )
+ val initialData = Seq(
+ Row("1", row1Items),
+ Row("2", row2Items)
+ )
+ val dfInit =
spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1)
+
+ // ==========================================================
+ // STEP 2: Write initial data using INSERT (not bulk-insert)
+ // ==========================================================
+ var hudiOpts = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOpts = hudiOpts + (HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key
-> logFormat)
+ }
+
dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath)
+ // Verify initial data
+ val dfAfterInsert = spark.read.format("hudi").load(tablePath)
+ assertEquals(2, dfAfterInsert.count(), "Should have 2 records after
initial insert")
+
+ // ==========================================================
+ // STEP 3: Schema evolution - Add a new field to the struct inside the
array
+ // ==========================================================
+ val schemaV2 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("items", ArrayType(new StructType()
+ .add("a", IntegerType, nullable = true)
+ .add("b", IntegerType, nullable = true)
+ .add("c", IntegerType, nullable = true)
+ .add("d", IntegerType, nullable = true)
+ .add("e", IntegerType, nullable = true)
+ ), nullable = true)
+ val row1ItemsEvolved = Seq(
+ Row(1, 11, 111, 1111, 11111),
+ Row(2, 22, 222, 2222, 22222),
+ Row(3, 33, 333, 3333, 33333)
+ )
+ val dfEvolved = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", row1ItemsEvolved))),
+ schemaV2
+ )
+
+ // ==========================================================
+ // STEP 4: Upsert with HoodieSparkRecordMerger
+ // ==========================================================
+ var hudiOptsUpsert = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
"org.apache.hudi.HoodieSparkRecordMerger"
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOptsUpsert = hudiOptsUpsert +
(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logFormat)
+ }
+
dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath)
+
+ // ==========================================================
+ // STEP 5: Load after upsert and verify data integrity
+ // ==========================================================
+ val dfFinal = spark.read.format("hudi").load(tablePath)
+ // Verify we still have 2 records
+ assertEquals(2, dfFinal.count(), "Should still have 2 records after
upsert")
+ // Verify the schema includes the new field 'e'
+ val finalSchema = dfFinal.schema
+ val itemsField = finalSchema.fields.find(_.name == "items").get
+ assertTrue(itemsField.dataType.isInstanceOf[ArrayType], "items should be
an ArrayType")
+ val arrayType = itemsField.dataType.asInstanceOf[ArrayType]
+ assertTrue(arrayType.elementType.isInstanceOf[StructType], "items array
element should be StructType")
+ val structType = arrayType.elementType.asInstanceOf[StructType]
+ val fieldNames = structType.fields.map(_.name)
+ assertTrue(fieldNames.contains("e"), "Schema should include the evolved
field 'e'")
+ assertEquals(5, fieldNames.length, "Struct should have 5 fields (a, b, c,
d, e)")
+ // Verify we can read all data without errors (this would fail if data is
corrupted)
+ dfFinal.foreach(_ => {})
+
+ // Verify data for id="1" (updated record)
+ val record1 = dfFinal.filter("id = '1'").collect()
+ assertEquals(1, record1.length, "Should have exactly one record with
id='1'")
+ val items1 = record1(0).getAs[scala.collection.Seq[Row]]("items").toSeq
+ assertNotNull(items1, "items should not be null for id='1'")
+ assertEquals(3, items1.length, "id='1' should have 3 items")
+ // Verify first item of id="1" has all fields including 'e'
+ val firstItem1 = items1.head
+ assertEquals(1, firstItem1.getInt(0), "First item 'a' should be 1")
+ assertEquals(11, firstItem1.getInt(1), "First item 'b' should be 11")
+ assertEquals(111, firstItem1.getInt(2), "First item 'c' should be 111")
+ assertEquals(1111, firstItem1.getInt(3), "First item 'd' should be 1111")
+ assertEquals(11111, firstItem1.getInt(4), "First item 'e' should be 11111")
+
+ // Verify second item of id="1"
+ val secondItem1 = items1(1)
+ assertEquals(2, secondItem1.getInt(0), "Second item 'a' should be 2")
+ assertEquals(22, secondItem1.getInt(1), "Second item 'b' should be 22")
+ assertEquals(222, secondItem1.getInt(2), "Second item 'c' should be 222")
+ assertEquals(2222, secondItem1.getInt(3), "Second item 'd' should be 2222")
+ assertEquals(22222, secondItem1.getInt(4), "Second item 'e' should be
22222")
+ // Verify data for id="2" (unchanged record - should have null for 'e')
+ val record2 = dfFinal.filter("id = '2'").collect()
+ assertEquals(1, record2.length, "Should have exactly one record with
id='2'")
+ val items2 = record2(0).getAs[scala.collection.Seq[Row]]("items").toSeq
+ assertNotNull(items2, "items should not be null for id='2'")
+ assertEquals(3, items2.length, "id='2' should have 3 items")
+ // Verify first item of id="2" - should have original values and null for
'e'
+ val firstItem2 = items2(0)
+ assertEquals(10, firstItem2.getInt(0), "First item 'a' should be 10")
+ assertEquals(77, firstItem2.getInt(1), "First item 'b' should be 77")
+ assertEquals(777, firstItem2.getInt(2), "First item 'c' should be 777")
+ assertEquals(7777, firstItem2.getInt(3), "First item 'd' should be 7777")
+ assertTrue(firstItem2.isNullAt(4), "First item 'e' should be null for
unchanged record")
+ // Verify second item of id="2"
+ val secondItem2 = items2(1)
+ assertEquals(20, secondItem2.getInt(0), "Second item 'a' should be 20")
+ assertEquals(88, secondItem2.getInt(1), "Second item 'b' should be 88")
+ assertEquals(888, secondItem2.getInt(2), "Second item 'c' should be 888")
+ assertEquals(8888, secondItem2.getInt(3), "Second item 'd' should be 8888")
+ assertTrue(secondItem2.isNullAt(4), "Second item 'e' should be null for
unchanged record")
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testParameters"))
+ def testNestedStructSchemaEvolutionWithRecordMerger(tableType: String,
logFormat: String): Unit = {
+ val tablePath = basePath +
s"/nested_struct_evolution_${tableType}_${logFormat}"
+ val tableName = s"nested_struct_evolution_${tableType}_${logFormat}"
+
+ // ==========================================================
+ // STEP 1: Initial schema with nested struct (no evolution yet)
+ // ==========================================================
+ val nestedStructV1 = new StructType()
+ .add("x", IntegerType, nullable = true)
+ .add("y", IntegerType, nullable = true)
+ .add("z", IntegerType, nullable = true)
+ val schemaV1 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("location", nestedStructV1, nullable = true)
+ .add("name", StringType, nullable = true)
+ val initialData = Seq(
+ Row("1", Row(10, 20, 30), "Location1"),
+ Row("2", Row(40, 50, 60), "Location2")
+ )
+ val dfInit =
spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1)
+
+ // ==========================================================
+ // STEP 2: Write initial data using INSERT
+ // ==========================================================
+ var hudiOpts = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOpts = hudiOpts + (HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key
-> logFormat)
+ }
+
dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath)
+ val dfAfterInsert = spark.read.format("hudi").load(tablePath)
+ assertEquals(2, dfAfterInsert.count(), "Should have 2 records after
initial insert")
+
+ // ==========================================================
+ // STEP 3: Schema evolution - Add a new field to the nested struct
+ // ==========================================================
+ val nestedStructV2 = new StructType()
+ .add("x", IntegerType, nullable = true)
+ .add("y", IntegerType, nullable = true)
+ .add("z", IntegerType, nullable = true)
+ .add("w", IntegerType, nullable = true)
+ val schemaV2 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("location", nestedStructV2, nullable = true)
+ .add("name", StringType, nullable = true)
+ val evolvedData = Seq(
+ Row("1", Row(10, 20, 30, 40), "Location1")
+ )
+ val dfEvolved = spark.createDataFrame(
+ spark.sparkContext.parallelize(evolvedData),
+ schemaV2
+ )
+
+ // ==========================================================
+ // STEP 4: Upsert with HoodieSparkRecordMerger
+ // ==========================================================
+ var hudiOptsUpsert = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
"org.apache.hudi.HoodieSparkRecordMerger"
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOptsUpsert = hudiOptsUpsert +
(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logFormat)
+ }
+
dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath)
+
+ // ==========================================================
+ // STEP 5: Load after upsert and verify data integrity
+ // ==========================================================
+ val dfFinal = spark.read.format("hudi").load(tablePath)
+ assertEquals(2, dfFinal.count(), "Should still have 2 records after
upsert")
+
+ // Verify the schema includes the new field 'w' in nested struct
+ val finalSchema = dfFinal.schema
+ val locationField = finalSchema.fields.find(_.name == "location").get
+ assertTrue(locationField.dataType.isInstanceOf[StructType], "location
should be a StructType")
+ val nestedStructType = locationField.dataType.asInstanceOf[StructType]
+ val fieldNames = nestedStructType.fields.map(_.name)
+ assertTrue(fieldNames.contains("w"), "Schema should include the evolved
field 'w'")
+ assertEquals(4, fieldNames.length, "Nested struct should have 4 fields (x,
y, z, w)")
+ dfFinal.foreach(_ => {})
+
+ // Verify data for id="1" (updated record)
+ val record1 = dfFinal.filter("id = '1'").collect()
+ assertEquals(1, record1.length, "Should have exactly one record with
id='1'")
+ val location1 = record1(0).getAs[Row]("location")
+ assertNotNull(location1, "location should not be null for id='1'")
+ assertEquals(10, location1.getInt(0), "Location 'x' should be 10")
+ assertEquals(20, location1.getInt(1), "Location 'y' should be 20")
+ assertEquals(30, location1.getInt(2), "Location 'z' should be 30")
+ assertEquals(40, location1.getInt(3), "Location 'w' should be 40")
+
+ // Verify data for id="2" (unchanged record - should have null for 'w')
+ val record2 = dfFinal.filter("id = '2'").collect()
+ assertEquals(1, record2.length, "Should have exactly one record with
id='2'")
+ val location2 = record2(0).getAs[Row]("location")
+ assertNotNull(location2, "location should not be null for id='2'")
+ assertEquals(40, location2.getInt(0), "Location 'x' should be 40")
+ assertEquals(50, location2.getInt(1), "Location 'y' should be 50")
+ assertEquals(60, location2.getInt(2), "Location 'z' should be 60")
+ assertTrue(location2.isNullAt(3), "Location 'w' should be null for
unchanged record")
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("testParameters"))
+ def testArrayMapStructSchemaEvolutionWithRecordMerger(tableType: String,
logFormat: String): Unit = {
+ val tablePath = basePath +
s"/array_map_struct_evolution_${tableType}_${logFormat}"
+ val tableName = s"array_map_struct_evolution_${tableType}_${logFormat}"
+
+ // ==========================================================
+ // STEP 1: Initial schema with Array[Map[String, Struct]] (no evolution
yet)
+ // ==========================================================
+ val innerStructV1 = new StructType()
+ .add("col1", StringType, nullable = true)
+ .add("col2", StringType, nullable = true)
+ .add("col3", IntegerType, nullable = true)
+ val schemaV1 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("events", ArrayType(
+ new MapType(StringType, innerStructV1, true)
+ ), nullable = true)
+ val initialData = Seq(
+ Row("1", Seq(
+ Map("2022-12-01" -> Row("a1", "b1", 100)),
+ Map("2022-12-02" -> Row("a2", "b2", 200))
+ )),
+ Row("2", Seq(
+ Map("2022-12-03" -> Row("a3", "b3", 300)),
+ Map("2022-12-04" -> Row("a4", "b4", 400))
+ ))
+ )
+ val dfInit =
spark.createDataFrame(spark.sparkContext.parallelize(initialData), schemaV1)
+
+ // ==========================================================
+ // STEP 2: Write initial data using INSERT
+ // ==========================================================
+ var hudiOpts = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[HoodieSparkRecordMerger].getName
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOpts = hudiOpts + (HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key
-> logFormat)
+ }
+
dfInit.write.format("hudi").options(hudiOpts).mode(SaveMode.Overwrite).save(tablePath)
+ val dfAfterInsert = spark.read.format("hudi").load(tablePath)
+ assertEquals(2, dfAfterInsert.count(), "Should have 2 records after
initial insert")
+
+ // ==========================================================
+ // STEP 3: Schema evolution - Add a new field to the struct inside the map
+ // ==========================================================
+ val innerStructV2 = new StructType()
+ .add("col1", StringType, nullable = true)
+ .add("col2", StringType, nullable = true)
+ .add("col3", IntegerType, nullable = true)
+ .add("col4", IntegerType, nullable = true)
+ val schemaV2 = new StructType()
+ .add("id", StringType, nullable = true)
+ .add("events", ArrayType(
+ new MapType(StringType, innerStructV2, true)
+ ), nullable = true)
+ val evolvedData = Seq(
+ Row("1", Seq(
+ Map("2022-12-01" -> Row("a1", "b1", 100, 1000)),
+ Map("2022-12-02" -> Row("a2", "b2", 200, 2000))
+ ))
+ )
+ val dfEvolved = spark.createDataFrame(
+ spark.sparkContext.parallelize(evolvedData),
+ schemaV2
+ )
+
+ // ==========================================================
+ // STEP 4: Upsert with HoodieSparkRecordMerger
+ // ==========================================================
+ var hudiOptsUpsert = Map(
+ HoodieTableConfig.NAME.key -> tableName,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "id",
+ DataSourceWriteOptions.OPERATION.key ->
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
"org.apache.hudi.HoodieSparkRecordMerger"
+ )
+ // Add log format for MOR tables
+ if (tableType == DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) {
+ hudiOptsUpsert = hudiOptsUpsert +
(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logFormat)
+ }
+
dfEvolved.write.format("hudi").options(hudiOptsUpsert).mode(SaveMode.Append).save(tablePath)
+
+ // ==========================================================
+ // STEP 5: Load after upsert and verify data integrity
+ // ==========================================================
+ val dfFinal = spark.read.format("hudi").load(tablePath)
+ assertEquals(2, dfFinal.count(), "Should still have 2 records after
upsert")
+
+ // Verify the schema includes the new field 'col4' in nested struct
+ val finalSchema = dfFinal.schema
+ val eventsField = finalSchema.fields.find(_.name == "events").get
+ assertTrue(eventsField.dataType.isInstanceOf[ArrayType], "events should be
an ArrayType")
+ val arrayType = eventsField.dataType.asInstanceOf[ArrayType]
+ assertTrue(arrayType.elementType.isInstanceOf[MapType], "events array
element should be MapType")
+ val mapType = arrayType.elementType.asInstanceOf[MapType]
+ assertTrue(mapType.valueType.isInstanceOf[StructType], "map value should
be StructType")
+ val structType = mapType.valueType.asInstanceOf[StructType]
+ val fieldNames = structType.fields.map(_.name)
+ assertTrue(fieldNames.contains("col4"), "Schema should include the evolved
field 'col4'")
+ assertEquals(4, fieldNames.length, "Struct should have 4 fields (col1,
col2, col3, col4)")
+ dfFinal.foreach(_ => {})
+
+ // Verify data for id="1" (updated record)
+ val record1 = dfFinal.filter("id = '1'").collect()
+ assertEquals(1, record1.length, "Should have exactly one record with
id='1'")
+ val events1 =
record1(0).getAs[scala.collection.Seq[scala.collection.Map[String,
Row]]]("events").toSeq
+ assertNotNull(events1, "events should not be null for id='1'")
+ assertEquals(2, events1.length, "id='1' should have 2 events")
+
+ val firstEvent1 = events1.head.get("2022-12-01").get
+ assertEquals("a1", firstEvent1.getString(0), "First event col1 should be
'a1'")
+ assertEquals("b1", firstEvent1.getString(1), "First event col2 should be
'b1'")
+ assertEquals(100, firstEvent1.getInt(2), "First event col3 should be 100")
+ assertEquals(1000, firstEvent1.getInt(3), "First event col4 should be
1000")
+
+ // Verify data for id="2" (unchanged record - should have null for 'col4')
+ val record2 = dfFinal.filter("id = '2'").collect()
+ assertEquals(1, record2.length, "Should have exactly one record with
id='2'")
+ val events2 =
record2(0).getAs[scala.collection.Seq[scala.collection.Map[String,
Row]]]("events").toSeq
+ assertNotNull(events2, "events should not be null for id='2'")
+ assertEquals(2, events2.length, "id='2' should have 2 events")
+
+ val firstEvent2 = events2.head.get("2022-12-03").get
+ assertEquals("a3", firstEvent2.getString(0), "First event col1 should be
'a3'")
+ assertEquals("b3", firstEvent2.getString(1), "First event col2 should be
'b3'")
+ assertEquals(300, firstEvent2.getInt(2), "First event col3 should be 300")
+ assertTrue(firstEvent2.isNullAt(3), "First event col4 should be null for
unchanged record")
+ }
+}
+
+object TestComplexTypeSchemaEvolution {
+ def testParameters(): java.util.stream.Stream[Arguments] = {
+ import org.junit.jupiter.params.provider.Arguments.arguments
+ java.util.stream.Stream.of(
+ arguments(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, "avro"),
+ arguments(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, "parquet")
+ )
+ }
+}