rahil-c commented on code in PR #17904:
URL: https://github.com/apache/hudi/pull/17904#discussion_r2734496577
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkBasicSchemaEvolution.scala:
##########
@@ -20,32 +20,73 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.types.StructType
/**
- * Intended to be used just with HoodieSparkParquetReader to avoid any
java/scala issues
+ * Generic schema evolution handler for different file formats.
+ * Supports Parquet (default), and Lance currently.
+ *
+ * @param fileSchema The schema from the file
+ * @param requiredSchema The schema requested by the query
+ * @param sessionLocalTimeZone The session's local timezone
+ * @param fileFormat The file format being read. Defaults to PARQUET for
backward compatibility.
*/
class SparkBasicSchemaEvolution(fileSchema: StructType,
requiredSchema: StructType,
- sessionLocalTimeZone: String) {
+ sessionLocalTimeZone: String,
+ fileFormat: HoodieFileFormat =
HoodieFileFormat.PARQUET) {
val (implicitTypeChangeInfo, sparkRequestSchema) =
HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo(fileSchema,
requiredSchema)
def getRequestSchema: StructType = {
- if (implicitTypeChangeInfo.isEmpty) {
- requiredSchema
- } else {
- sparkRequestSchema
+ fileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ if (implicitTypeChangeInfo.isEmpty) {
+ requiredSchema
+ } else {
+ sparkRequestSchema
+ }
+ case HoodieFileFormat.LANCE =>
+ // need to filter to only fields that exist in file for lance
+ val fileFieldNames = fileSchema.fieldNames.toSet
+ StructType(sparkRequestSchema.fields.filter(f =>
fileFieldNames.contains(f.name)))
+ case _ =>
+ throw new UnsupportedOperationException(s"Unsupported file format:
$fileFormat")
}
}
def generateUnsafeProjection(): UnsafeProjection = {
val schemaUtils: HoodieSchemaUtils = sparkAdapter.getSchemaUtils
-
HoodieParquetFileFormatHelper.generateUnsafeProjection(schemaUtils.toAttributes(requiredSchema),
Some(sessionLocalTimeZone),
- implicitTypeChangeInfo, requiredSchema, new StructType(), schemaUtils)
+
+ fileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ HoodieParquetFileFormatHelper.generateUnsafeProjection(
+ schemaUtils.toAttributes(requiredSchema),
+ Some(sessionLocalTimeZone),
+ implicitTypeChangeInfo,
+ requiredSchema,
+ new StructType(),
+ schemaUtils
+ )
+ case HoodieFileFormat.LANCE =>
+ // requires null padding for missing columns
+ val requestSchema = getRequestSchema
+ HoodieParquetFileFormatHelper.generateUnsafeProjection(
Review Comment:
For now left the `HoodieParquetFileFormatHelper` name unchanged as did not
want to extra noise in the diff for reviewers. However I do believe that if
this commits look directionally good, that we should likely update the name of
the file to something now that we are trying to mostly the same code for both
parquet and lance for schema evolution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]