This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7e50f9a [HUDI-2061] Incorrect Schema Inference For Schema Evolved
Table (#3137)
7e50f9a is described below
commit 7e50f9a5a6be8d68b13a587a07a9af81819540ca
Author: pengzhiwei <[email protected]>
AuthorDate: Thu Jun 24 13:48:01 2021 +0800
[HUDI-2061] Incorrect Schema Inference For Schema Evolved Table (#3137)
---
.../main/scala/org/apache/hudi/DefaultSource.scala | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 32bd9a4..d9f64ba 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
+import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.execution.datasources.{DataSource,
FileStatusCache, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -213,11 +214,27 @@ class DefaultSource extends RelationProvider
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
+ val specifySchema = if (schema == null) {
+ // Load the schema from the commit meta data.
+ // Here we should specify the schema to the latest commit schema since
+ // the table schema evolution.
+ val tableSchemaResolver = new TableSchemaResolver(metaClient)
+ try {
+
Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema)
+ .dataType.asInstanceOf[StructType])
+ } catch {
+ case _: Throwable =>
+ None // If there is no commit in the table, we can not get the
schema
+ // with tableSchemaResolver, return None here.
+ }
+ } else {
+ Some(schema)
+ }
// simply return as a regular relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
paths = extraReadPaths,
- userSpecifiedSchema = Option(schema),
+ userSpecifiedSchema = specifySchema,
className = formatClassName,
options = optParams)
.resolveRelation()