This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e50f9a  [HUDI-2061] Incorrect Schema Inference For Schema Evolved 
Table (#3137)
7e50f9a is described below

commit 7e50f9a5a6be8d68b13a587a07a9af81819540ca
Author: pengzhiwei <pengzhiwei2...@icloud.com>
AuthorDate: Thu Jun 24 13:48:01 2021 +0800

    [HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137)
---
 .../main/scala/org/apache/hudi/DefaultSource.scala    | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 32bd9a4..d9f64ba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
+import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.execution.datasources.{DataSource, 
FileStatusCache, HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider
         classOf[HoodieROTablePathFilter],
         classOf[org.apache.hadoop.fs.PathFilter])
 
+      val specifySchema = if (schema == null) {
+        // Load the schema from the commit meta data.
+        // Here we should specify the schema to the latest commit schema since
+        // the table schema evolution.
+        val tableSchemaResolver = new TableSchemaResolver(metaClient)
+        try {
+          
Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema)
+            .dataType.asInstanceOf[StructType])
+        } catch {
+          case _: Throwable =>
+            None // If there is no commit in the table, we can not get the 
schema
+                 // with tableSchemaResolver, return None here.
+        }
+      } else {
+        Some(schema)
+      }
       // simply return as a regular relation
       DataSource.apply(
         sparkSession = sqlContext.sparkSession,
         paths = extraReadPaths,
-        userSpecifiedSchema = Option(schema),
+        userSpecifiedSchema = specifySchema,
         className = formatClassName,
         options = optParams)
         .resolveRelation()

Reply via email to