[
https://issues.apache.org/jira/browse/HUDI-9565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Vexler updated HUDI-9565:
----------------------------------
Status: In Progress (was: Open)
> Proposal: Unify Schema Evolution in the Reader
> ----------------------------------------------
>
> Key: HUDI-9565
> URL: https://issues.apache.org/jira/browse/HUDI-9565
> Project: Apache Hudi
> Issue Type: New Feature
> Components: reader-core
> Reporter: Jonathan Vexler
> Assignee: Jonathan Vexler
> Priority: Major
> Fix For: 1.1.0
>
>
> There are 2 types of schema evolution that we have:
> schema.on.write
> schema.on.read
> You can read more about those here:
> [https://hudi.apache.org/docs/schema_evolution]
> This document will focus on schema.on.write but could be expanded with
> schema.on.read incorporated as well.
> Schema.on.write is intended to support "easy" evolutions, and many query
> engines already have most of the support built in. Our current support of
> schema evolution is a patchwork of fixes targeted at specific readers or
> engines.
> Avro:
>
> HoodieAvroParquetReader and HoodieAvroDataBlock will do the following:
> # get the schema that the file was written with
> # call HoodieAvroUtils#recordNeedsRewriteForExtendedAvroTypePromotion
> # If that returns false, we can just use the reader schema to read the
> records
> # if that returns true, we will call
> HoodieAvroUtils#rewriteRecordWithNewSchema for each record to do any type
> promotions that are not supported natively by avro
> Spark:
> We have 2 parquet readers:
> HoodieSparkParquetReader
> this uses the parquet-hadoop reader with a ParquetReadSupport from spark-sql
> we use this to read parquet log files, but it is also used in the non-fg
> reader based merge handle
>
> The evolution here is similar to avro. We have a simple util class
> SparkBasicSchemaEvolution that invokes
> HoodieParquetFileFormatHelper.buildImplicitSchemaChangeInfo() this returns an
> object that is essentially a mapping of columns that need to be casted from
> one type to another. Then for each record we invoke
> HoodieParquetFileFormatHelper.generateUnsafeProjection to generate a lambda
> function that will convert each record into the desired output
> The other spark reader is more complex.
> *Overview of the SparkParquetReader, can skip this if you already are
> familiar*
> We have an interface
> {code:java}
> trait SparkParquetReader extends Serializable {
> /**
> * Read an individual parquet file
> *
> * @param file parquet file to read
> * @param requiredSchema desired output schema of the data
> * @param partitionSchema schema of the partition columns. Partition
> values will be appended to the end of every row
> * @param internalSchemaOpt option of internal schema for schema.on.read
> * @param filters filters for data skipping. Not guaranteed to
> be used; the spark plan will also apply the filters.
> * @param storageConf the hadoop conf
> * @return iterator of rows read from the file output type says
> [[InternalRow]] but could be [[ColumnarBatch]]
> */
> def read(file: PartitionedFile,
> requiredSchema: StructType,
> partitionSchema: StructType,
> internalSchemaOpt: util.Option[InternalSchema],
> filters: Seq[Filter],
> storageConf: StorageConfiguration[Configuration]):
> Iterator[InternalRow]
> } {code}
> That is implemented for each spark version. The implementation mostly matches
> the implementation in OSS Spark ParquetFileFormat. We have some compatibility
> fixes for spark minor versions, and we also changed it so we can read
> individual parquet files instead of creating a lambda function that is used
> to read a series of files. We can tune the filters and schemas at a per file
> level which was not possible with the default spark implementation. We have
> Spark35ParquetReader, Spark34ParquetReader, Spark33ParquetReader implemented
> and use spark adapter to fetch those implementations.
> *Done with spark reader explanation*
>
> Schema evolution was consolidated into a class
> Spark3ParquetSchemaEvolutionUtils to reduce duplicate code, and to keep
> SparkParquetReader as close to the OSS implementation as possible for better
> maintainability.
>
> We generate the type change infos and cast the records like we do in the
> other spark reader, but schema.on.read is also handled here. Any pushed down
> filters need to be rebuilt and if vectorized reader is used, we have a
> specialized vectorized reader that will cast promoted columns.
> *How do we make this universal or unify across record formats (spark, avro,
> java, hive)?*
> We need to put the schema evolution handling into the fg reader, remove the
> specialized implementations, and make it generic. We will need to add
> interfaces for reading the write schema from the file metadata, and project
> the record before merging with log files. Additionally, we will need to
> modify pushdown filters as well
> Complications include:
> # Extra read of metadata might be costly, we might consider changing reader
> implementations to take in the metadata as a param so we only need a single
> read
> # For Hive, only reordering top level cols has been implemented. Still need
> to implement type promotions and nested cols
> # WriteHandles have their own ways of dealing with schema evolution, if we
> remove the evolution from the spark and avro readers, we need to ensure the
> non-fg reader based write paths are still functional
> # Performance cost of projection. Readers have built in mechanisms for most
> cases of basic schema evolution. Do they have optimizations that our
> universal solution won't be able to take advantage of?
> # Non-fg reader paths. For spark, we avoid using the fg reader for file
> slices that are just a base file. This avoids the overhead of the fg reader
> and the overhead of sending the fileslice to the executor.
> # Universal Schema? Avro is somewhat difficult to work with. We have
> InternalSchema but it is scala and not widely used in the codebase.
> # Vectorized reading. FG reader does not support vectorized reading. To
> maintain schema evolution support for spark vectorized reader, we will need
> to support vectorized in the fg reader and implementations such as projection
> might require nontrivial work
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)