rahil-c commented on code in PR #17904:
URL: https://github.com/apache/hudi/pull/17904#discussion_r2766451006


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkSchemaTransformUtils.scala:
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.HoodieSchemaUtils
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, 
AttributeReference, Cast, CreateNamedStruct, CreateStruct, Expression, 
GetStructField, LambdaFunction, Literal, MapEntries, MapFromEntries, 
NamedLambdaVariable, UnsafeProjection}
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, 
DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, 
StructType, TimestampNTZType}
+
+/**
+ * Format-agnostic utilities for Spark schema transformations including NULL 
padding
+ * and recursive type casting with workarounds for unsafe conversions.
+ *
+ * These utilities are used by file format readers that need to:
+ * - Pad missing columns with NULL literals (required for Lance)
+ * - Handle nested struct/array/map type conversions
+ * - Work around Spark unsafe cast issues (float->double, numeric->decimal)
+ *
+ * Note: The following functions were originally part of 
HoodieParquetFileFormatHelper
+ * and have been moved here to allow reuse across multiple file formats:
+ * - buildImplicitSchemaChangeInfo
+ * - isDataTypeEqual
+ * - generateUnsafeProjection
+ * - hasUnsupportedConversion
+ * - recursivelyCastExpressions
+ */
+object SparkSchemaTransformUtils {
+
+  /**
+   * Generate UnsafeProjection for type casting with special handling for 
unsupported conversions.
+   *
+   * @param fullSchema Complete schema including data and partition columns
+   * @param timeZoneId Session timezone for timestamp conversions
+   * @param typeChangeInfos Map of field index to (targetType, readerType) for 
fields needing casting
+   * @param requiredSchema Schema requested by the query (data columns only)
+   * @param partitionSchema Schema of partition columns
+   * @param schemaUtils Spark adapter schema utilities
+   * @return UnsafeProjection that applies type casting to rows
+   */
+  def generateUnsafeProjection(fullSchema: Seq[Attribute],
+                               timeZoneId: Option[String],
+                               typeChangeInfos: java.util.Map[Integer, 
org.apache.hudi.common.util.collection.Pair[DataType, DataType]],
+                               requiredSchema: StructType,
+                               partitionSchema: StructType,
+                               schemaUtils: HoodieSchemaUtils): 
UnsafeProjection = {
+    if (typeChangeInfos.isEmpty) {
+      GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+    } else {
+      // find type changed.
+      val newSchema = new StructType(requiredSchema.fields.zipWithIndex.map { 
case (f, i) =>
+        if (typeChangeInfos.containsKey(i)) {
+          StructField(f.name, typeChangeInfos.get(i).getRight, f.nullable, 
f.metadata)
+        } else f
+      })
+      val newFullSchema = schemaUtils.toAttributes(newSchema) ++ 
schemaUtils.toAttributes(partitionSchema)
+      val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+        if (typeChangeInfos.containsKey(i)) {
+          val srcType = typeChangeInfos.get(i).getRight
+          val dstType = typeChangeInfos.get(i).getLeft
+          SparkSchemaTransformUtils.recursivelyCastExpressions(
+            attr, srcType, dstType, timeZoneId
+          )
+        } else attr
+      }
+      GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+    }
+  }
+
+  /**
+   * Generate UnsafeProjection that pads missing columns with NULL literals.
+   *
+   * @param inputSchema Schema from file (fields actually present)
+   * @param targetSchema Target output schema (may have more fields than file)
+   * @return UnsafeProjection that pads missing columns with NULLs
+   */
+  def generateNullPaddingProjection(
+      inputSchema: StructType,
+      targetSchema: StructType
+  ): UnsafeProjection = {
+    val inputAttributes = inputSchema.fields.map(f =>
+      AttributeReference(f.name, f.dataType, f.nullable)())
+    val inputFieldMap = inputAttributes.map(a => a.name -> a).toMap
+
+    // Build expressions for all target fields, padding missing columns with 
NULL
+    val expressions = targetSchema.fields.map { field =>
+      inputFieldMap.get(field.name) match {
+        case Some(attr) =>
+          // Field exists in input - check if nested padding needed
+          if (needsNestedPadding(attr.dataType, field.dataType)) {
+            recursivelyPadExpression(attr, attr.dataType, field.dataType)
+          } else {
+            attr
+          }
+        case None =>
+          // Field missing from input, use NULL literal for padding
+          Literal(null, field.dataType)
+      }
+    }
+
+    GenerateUnsafeProjection.generate(expressions, inputAttributes)
+  }
+
+  /**
+   * Recursively pad nested struct/array/map fields with NULLs.
+   *
+   * @param expr Source expression
+   * @param srcType Source data type
+   * @param dstType Destination data type (may have additional nested fields)
+   * @return Expression with NULL padding for missing nested fields
+   */
+  private def recursivelyPadExpression(
+      expr: Expression,
+      srcType: DataType,
+      dstType: DataType
+  ): Expression = (srcType, dstType) match {
+    case (s: StructType, d: StructType) =>
+      val srcFieldMap = s.fields.zipWithIndex.map { case (f, i) => f.name -> 
(f, i) }.toMap
+      val structFields = d.fields.map { dstField =>
+        srcFieldMap.get(dstField.name) match {
+          case Some((srcField, srcIndex)) =>
+            val child = GetStructField(expr, srcIndex, Some(dstField.name))
+            recursivelyPadExpression(child, srcField.dataType, 
dstField.dataType)
+          case None =>
+            Literal(null, dstField.dataType)
+        }
+      }
+      CreateNamedStruct(d.fields.zip(structFields).flatMap {
+        case (f, c) => Seq(Literal(f.name), c)
+      })
+
+    case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _))
+        if needsNestedPadding(sElementType, dElementType) =>
+      val lambdaVar = NamedLambdaVariable("element", sElementType, 
containsNull)
+      val body = recursivelyPadExpression(lambdaVar, sElementType, 
dElementType)
+      val func = LambdaFunction(body, Seq(lambdaVar))
+      ArrayTransform(expr, func)
+
+    case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _))
+        if needsNestedPadding(sKeyType, dKeyType) || 
needsNestedPadding(sValType, dValType) =>
+      val kv = NamedLambdaVariable("kv", new StructType()
+        .add("key", sKeyType, nullable = false)
+        .add("value", sValType, nullable = vnull), nullable = false)
+      val newKey = recursivelyPadExpression(GetStructField(kv, 0), sKeyType, 
dKeyType)
+      val newVal = recursivelyPadExpression(GetStructField(kv, 1), sValType, 
dValType)
+      val entry = CreateStruct(Seq(newKey, newVal))
+      val func = LambdaFunction(entry, Seq(kv))
+      val transformed = ArrayTransform(MapEntries(expr), func)
+      MapFromEntries(transformed)
+
+    case _ =>
+      // No padding needed, return expression as-is
+      expr
+  }
+
+  /**
+   * Recursively cast expressions with special handling for unsupported 
conversions.
+   *
+   * @param expr Source expression to cast
+   * @param srcType Source data type
+   * @param dstType Destination data type
+   * @param timeZoneId Session timezone for timestamp conversions
+   * @return Casted expression with workarounds for unsafe conversions
+   */
+  def recursivelyCastExpressions(
+      expr: Expression,
+      srcType: DataType,
+      dstType: DataType,
+      timeZoneId: Option[String]
+  ): Expression = {
+    lazy val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+    (srcType, dstType) match {
+      case (FloatType, DoubleType) =>
+        val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else 
None)
+        Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
+      case (IntegerType | LongType | FloatType | DoubleType, dec: DecimalType) 
=>
+        val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else 
None)
+        Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
+      case (StringType, dec: DecimalType) =>
+        Cast(expr, dec, if (needTimeZone) timeZoneId else None)
+      case (StringType, DateType) =>
+        Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
+      case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d) =>
+        val structFields = s.fields.zip(d.fields).zipWithIndex.map {
+          case ((srcField, dstField), i) =>
+            val child = GetStructField(expr, i, Some(dstField.name))
+            recursivelyCastExpressions(child, srcField.dataType, 
dstField.dataType, timeZoneId)
+        }
+        CreateNamedStruct(d.fields.zip(structFields).flatMap {
+          case (f, c) => Seq(Literal(f.name), c)
+        })
+      case (ArrayType(sElementType, containsNull), ArrayType(dElementType, _)) 
if hasUnsupportedConversion(sElementType, dElementType) =>
+        val lambdaVar = NamedLambdaVariable("element", sElementType, 
containsNull)
+        val body = recursivelyCastExpressions(lambdaVar, sElementType, 
dElementType, timeZoneId)
+        val func = LambdaFunction(body, Seq(lambdaVar))
+        ArrayTransform(expr, func)
+      case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, _))
+        if hasUnsupportedConversion(sKeyType, dKeyType) || 
hasUnsupportedConversion(sValType, dValType) =>
+        val kv = NamedLambdaVariable("kv", new StructType()
+          .add("key", sKeyType, nullable = false)
+          .add("value", sValType, nullable = vnull), nullable = false)
+        val newKey = recursivelyCastExpressions(GetStructField(kv, 0), 
sKeyType, dKeyType, timeZoneId)
+        val newVal = recursivelyCastExpressions(GetStructField(kv, 1), 
sValType, dValType, timeZoneId)
+        val entry = CreateStruct(Seq(newKey, newVal))
+        val func = LambdaFunction(entry, Seq(kv))
+        val transformed = ArrayTransform(MapEntries(expr), func)
+        MapFromEntries(transformed)
+      case _ =>
+        // most cases should be covered here we only need to do the recursive 
work for float to double
+        Cast(expr, dstType, if (needTimeZone) timeZoneId else None)
+    }
+  }
+
+  /**
+   * Used to determine if padding is required for nested struct fields.
+   * @param srcType Source data type
+   * @param dstType Destination data type
+   * @return true if destination has additional fields requiring NULL padding
+   */
+  private def needsNestedPadding(srcType: DataType, dstType: DataType): 
Boolean = (srcType, dstType) match {
+    // Need padding if destination has more fields or nested fields differ
+    case (StructType(srcFields), StructType(dstFields)) =>

Review Comment:
   When investigating this further and applying this e2e test in 
`TestLanceDataSource`, i do not think hudi considers this case to be a valid 
evolution though, and hits a failure much earlier.
   ```
   @ParameterizedTest
     @EnumSource(value = classOf[HoodieTableType])
     def testSchemaEvolutionNestedFieldRename(tableType: HoodieTableType): Unit 
= {
       val tableName = s"test_lance_nested_rename_${tableType.name.toLowerCase}"
       val tablePath = s"$basePath/$tableName"
   
       // Simulates production scenario: gradual migration from old to new 
field names
       // Old files have person.firstName, new files have person.fullName
       // When reading with evolved schema, old files should get NULL for 
fullName
   
       // Batch 1: Write with OLD schema using firstName
       val schema1 = StructType(Seq(
         StructField("id", IntegerType, false),
         StructField("person", StructType(Seq(
           StructField("firstName", StringType, true),
           StructField("age", IntegerType, true)
         )), true)
       ))
       val data1 = Seq(
         Row(1, Row("Alice", 30)),
         Row(2, Row("Bob", 25))
       )
       val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), 
schema1)
       writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"))
   
       // Batch 2: Write with NEW schema using fullName (simulating 
migration/rename)
       val schema2 = StructType(Seq(
         StructField("id", IntegerType, false),
         StructField("person", StructType(Seq(
           StructField("fullName", StringType, true),
           StructField("age", IntegerType, true)
         )), true)
       ))
       val data2 = Seq(
         Row(3, Row("Charlie", 35))
       )
       val df2 = spark.createDataFrame(spark.sparkContext.parallelize(data2), 
schema2)
       writeDataframe(tableType, tableName, tablePath, df2, saveMode = 
SaveMode.Append, operation = Some("insert"))
   
       // Read with evolved schema (fullName) - tests name-based field matching
       val readDf = spark.read.format("hudi").load(tablePath)
       val actual = readDf.select("id", "person.fullName", "person.age")
   
       // Expected behavior with name-based matching:
       // - Records from Batch 1: fullName=NULL (field doesn't exist in old 
files)
       // - Records from Batch 2: fullName="Charlie" (field exists)
       // - All records: age should be preserved
       val expectedDf = spark.createDataFrame(Seq(
         (1, null, 30),          // Old schema: firstName not mapped to fullName
         (2, null, 25),          // Old schema: firstName not mapped to fullName
         (3, "Charlie", 35)      // New schema: has fullName
       )).toDF("id", "fullName", "age")
   
       assertTrue(expectedDf.except(actual).isEmpty, "Expected and actual 
should match")
       assertTrue(actual.except(expectedDf).isEmpty, "Actual and expected 
should match")
     }
   
   ```
   
   As it will end up failing on table schema validation
   ```
   org.apache.hudi.exception.MissingSchemaFieldException: Schema validation 
failed due to missing field. Fields missing from incoming schema: 
{test_lance_nested_rename_copy_on_write_record.person.person.firstName}
   writerSchema: 
{"type":"record","name":"test_lance_nested_rename_copy_on_write_record","namespace":"hoodie.test_lance_nested_rename_copy_on_write","fields":[{"name":"id","type":"int"},{"name":"person","type":["null",{"type":"record","name":"person","namespace":"hoodie.test_lance_nested_rename_copy_on_write.test_lance_nested_rename_copy_on_write_record","fields":[{"name":"age","type":["null","int"],"default":null},{"name":"fullName","type":["null","string"],"default":null}]}],"default":null}]}
   tableSchema: 
{"type":"record","name":"test_lance_nested_rename_copy_on_write_record","namespace":"hoodie.test_lance_nested_rename_copy_on_write","fields":[{"name":"id","type":"int"},{"name":"person","type":["null",{"type":"record","name":"person","namespace":"hoodie.test_lance_nested_rename_copy_on_write.test_lance_nested_rename_copy_on_write_record","fields":[{"name":"firstName","type":["null","string"],"default":null},{"name":"age","type":["null","int"],"default":null}]}],"default":null}]}
   
        at 
org.apache.hudi.avro.AvroSchemaUtils.checkValidEvolution(AvroSchemaUtils.java:585)
        at 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.checkValidEvolution(HoodieSchemaCompatibility.java:117)
        at 
org.apache.hudi.HoodieSchemaUtils$.deduceWriterSchemaWithoutReconcile(HoodieSchemaUtils.scala:181)
   ```
   
   Since it would be confusing to return the previous data of `firstName`, if 
this was some rename operation on a column its also confusing to return `NULL` 
now. Therefore my assumption is there should not be a case for doing null 
padding when number of columns is the same but the names of the columns is 
different



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to