rahil-c commented on code in PR #17904:
URL: https://github.com/apache/hudi/pull/17904#discussion_r2756464324


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala:
##########
@@ -20,32 +20,118 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hudi.SparkAdapterSupport.sparkAdapter
-
+import org.apache.hudi.common.model.HoodieFileFormat
 import org.apache.spark.sql.HoodieSchemaUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.SparkSchemaTransformUtils
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
 
 
 /**
- * Intended to be used just with HoodieSparkParquetReader to avoid any 
java/scala issues
+ * Generic schema evolution handler for different file formats.
+ * Supports Parquet (default), and Lance currently.

Review Comment:
   Will try my best to explain this based off my understanding by working 
backwards. Note will use screenshots from an test case 
`testSchemaEvolutionAddColumn` which seeks to add a new column `email` in the 
schema to help illustrate the flow.
   
   Ultimately if I understand correctly the goal is to create an 
`UnsafeProjection.generate`, where we apply a projection on each of the unsafe 
rows in either the parquet or lance iterator which will handle things such as 
correctly casting the rows with an evolved schema. 
   
   Pointer to Spark's `CodeGenerator.scala` where this unsafe projection 
generate code comes from
   <img width="991" height="143" alt="Screenshot 2026-02-02 at 3 10 25 PM" 
src="https://github.com/user-attachments/assets/804e7463-b13d-4d76-8ebf-b30cfc6a4f7c";
 />
   
   *Iterators that use the projection and apply to each row*
   Lance: 
https://github.com/apache/hudi/pull/17904/changes#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R136
   Parquet: 
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java#L183
   
   The main reason why Parquet and Lance differ in their handling is even at 
the projection level is that we are required to manually ensure the 
`expressions` passed for `LANCE` are correctly ordered and padded so as to not 
cause invalid row conversion. In the `PARQUET` case the rows will come already 
padded with NULLs as mentioned 
above:https://github.com/apache/hudi/pull/17904#issuecomment-3761550758 due to 
parquets native reader handling this for us.
   
   So when we get to the generate unsafe projection logic we can directly pass 
the `fullSchema` which is the evolved schema (even though the file may not have 
this new column) and not worry about constructing the row incorrectly.
   
   <img width="1315" height="694" alt="Screenshot 2026-02-02 at 3 06 13 PM" 
src="https://github.com/user-attachments/assets/efe22037-249a-406a-8744-7ea234add31c";
 />
   
   For the LANCE case however the rows coming in are bounded by what columns 
are contained in the file
    mentioned limitation of the lance file reader 
https://github.com/apache/hudi/pull/17904#issuecomment-3761550758 and can not 
pass the fullSchema.
    
   So the rows coming in do not have any null padding, requiring us to 
construct a projection that will align with the evolved schema, hence why we 
pad here: 
https://github.com/apache/hudi/pull/17904/changes#diff-56d3b110e2b04263ed60368227bddd9bef085799f4917701f936cbc9f7f71572R77
 
    
   <img width="1313" height="842" alt="Screenshot 2026-02-02 at 2 01 24 PM" 
src="https://github.com/user-attachments/assets/549b9bc3-405b-4f8c-b28f-daec1eea9c48";
 />
   
   When testing without the padding, it seems to lead to the issues with index 
out of bound issues at the `UnSafeRow` level. 
   ```
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting 
bucketType UPDATE for partition :0
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
        at 
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.AssertionError: index (9) should < 9
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:118)
        at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:315)
        at 
org.apache.hudi.client.model.HoodieInternalRow.isNullAt(HoodieInternalRow.java:114)
        at 
com.lancedb.lance.spark.arrow.LanceArrowFieldWriter.write(LanceArrowFieldWriter.scala:44)
        at 
com.lancedb.lance.spark.arrow.LanceArrowWriter.write(LanceArrowWriter.scala:140)
        at 
org.apache.hudi.io.storage.HoodieSparkLanceWriter$SparkArrowWriter.write(HoodieSparkLanceWriter.java:185)
        at 
org.apache.hudi.io.storage.HoodieSparkLanceWriter$SparkArrowWriter.write(HoodieSparkLanceWriter.java:179)
        at 
org.apache.hudi.io.lance.HoodieBaseLanceWriter.write(HoodieBaseLanceWriter.java:121)
        at 
org.apache.hudi.io.storage.HoodieSparkLanceWriter.writeRow(HoodieSparkLanceWriter.java:125)
        at 
org.apache.hudi.io.storage.HoodieSparkFileWriter.write(HoodieSparkFileWriter.java:41)
        at 
org.apache.hudi.io.HoodieWriteMergeHandle.writeToFile(HoodieWriteMergeHandle.java:417)
        at 
org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:289)
        at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
        ... 35 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to