rahil-c commented on code in PR #17904:
URL: https://github.com/apache/hudi/pull/17904#discussion_r2756464324
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala:
##########
@@ -20,32 +20,118 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
-
+import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.SparkSchemaTransformUtils
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
/**
- * Intended to be used just with HoodieSparkParquetReader to avoid any
java/scala issues
+ * Generic schema evolution handler for different file formats.
+ * Supports Parquet (default), and Lance currently.
Review Comment:
Will try my best to explain this based off my understanding by working
backwards. Note will use screenshots from an test case
`testSchemaEvolutionAddColumn` which seeks to add a new column `email` in the
schema to help illustrate the flow.
Ultimately if I understand correctly the goal is to create an
`UnsafeProjection.generate`, where we apply a projection on each of the unsafe
rows in either the parquet or lance iterator which will handle things such as
correctly casting the rows with an evolved schema.
Pointer to Spark's `CodeGenerator.scala` where this unsafe projection
generate code comes from
<img width="991" height="143" alt="Screenshot 2026-02-02 at 3 10 25 PM"
src="https://github.com/user-attachments/assets/804e7463-b13d-4d76-8ebf-b30cfc6a4f7c"
/>
*Iterators that use the projection and apply to each row*
Lance:
https://github.com/apache/hudi/pull/17904/changes#diff-bdccaaaeb061abdf550efec86661f9d3790c66d53e04b1ed2e9cf9a61ea06e13R136
Parquet:
https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java#L183
The main reason why Parquet and Lance differ in their handling is even at
the projection level is that we are required to manually ensure the
`expressions` passed for `LANCE` are correctly ordered and padded so as to not
cause invalid row conversion. In the `PARQUET` case the rows will come already
padded with NULLs as mentioned
above:https://github.com/apache/hudi/pull/17904#issuecomment-3761550758 due to
parquets native reader handling this for us.
So when we get to the generate unsafe projection logic we can directly pass
the `fullSchema` which is the evolved schema (even though the file may not have
this new column) and not worry about constructing the row incorrectly.
<img width="1315" height="694" alt="Screenshot 2026-02-02 at 3 06 13 PM"
src="https://github.com/user-attachments/assets/efe22037-249a-406a-8744-7ea234add31c"
/>
For the LANCE case however the rows coming in are bounded by what columns
are contained in the file
mentioned limitation of the lance file reader
https://github.com/apache/hudi/pull/17904#issuecomment-3761550758 and can not
pass the fullSchema.
So the rows coming in do not have any null padding, requiring us to
construct a projection that will align with the evolved schema, hence why we
pad here:
https://github.com/apache/hudi/pull/17904/changes#diff-56d3b110e2b04263ed60368227bddd9bef085799f4917701f936cbc9f7f71572R77
<img width="1313" height="842" alt="Screenshot 2026-02-02 at 2 01 24 PM"
src="https://github.com/user-attachments/assets/549b9bc3-405b-4f8c-b28f-daec1eea9c48"
/>
When testing without the padding, it seems to lead to the issues with index
out of bound issues at the `UnSafeRow` level.
```
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting
bucketType UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
at
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
at
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.AssertionError: index (9) should < 9
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.assertIndexIsValid(UnsafeRow.java:118)
at
org.apache.spark.sql.catalyst.expressions.UnsafeRow.isNullAt(UnsafeRow.java:315)
at
org.apache.hudi.client.model.HoodieInternalRow.isNullAt(HoodieInternalRow.java:114)
at
com.lancedb.lance.spark.arrow.LanceArrowFieldWriter.write(LanceArrowFieldWriter.scala:44)
at
com.lancedb.lance.spark.arrow.LanceArrowWriter.write(LanceArrowWriter.scala:140)
at
org.apache.hudi.io.storage.HoodieSparkLanceWriter$SparkArrowWriter.write(HoodieSparkLanceWriter.java:185)
at
org.apache.hudi.io.storage.HoodieSparkLanceWriter$SparkArrowWriter.write(HoodieSparkLanceWriter.java:179)
at
org.apache.hudi.io.lance.HoodieBaseLanceWriter.write(HoodieBaseLanceWriter.java:121)
at
org.apache.hudi.io.storage.HoodieSparkLanceWriter.writeRow(HoodieSparkLanceWriter.java:125)
at
org.apache.hudi.io.storage.HoodieSparkFileWriter.write(HoodieSparkFileWriter.java:41)
at
org.apache.hudi.io.HoodieWriteMergeHandle.writeToFile(HoodieWriteMergeHandle.java:417)
at
org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:289)
at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
... 35 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]