This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7e50f9a [HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137) 7e50f9a is described below commit 7e50f9a5a6be8d68b13a587a07a9af81819540ca Author: pengzhiwei <pengzhiwei2...@icloud.com> AuthorDate: Thu Jun 24 13:48:01 2021 +0800 [HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137) --- .../main/scala/org/apache/hudi/DefaultSource.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 32bd9a4..d9f64ba 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) + val specifySchema = if (schema == null) { + // Load the schema from the commit meta data. + // Here we should specify the schema to the latest commit schema since + // the table schema evolution. + val tableSchemaResolver = new TableSchemaResolver(metaClient) + try { + Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema) + .dataType.asInstanceOf[StructType]) + } catch { + case _: Throwable => + None // If there is no commit in the table, we can not get the schema + // with tableSchemaResolver, return None here. + } + } else { + Some(schema) + } // simply return as a regular relation DataSource.apply( sparkSession = sqlContext.sparkSession, paths = extraReadPaths, - userSpecifiedSchema = Option(schema), + userSpecifiedSchema = specifySchema, className = formatClassName, options = optParams) .resolveRelation()