This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ae095b9a56ef2a25b8bb75406a51354b9f4ee13f
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Thu Apr 21 13:01:08 2022 -0700

    Replicating to Spark 3.1
---
 .../datasources/parquet/Spark31HoodieParquetFileFormat.scala | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
index 30e7b9c78a..e99850bef0 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala
@@ -154,8 +154,8 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
       val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
 
       val tablePath = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
-      val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
       val fileSchema = if (shouldUseInternalSchema) {
+        val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
         val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
         InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
       } else {
@@ -223,13 +223,17 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
 
       // Clone new conf
       val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
-      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
new java.util.HashMap()
-      if (shouldUseInternalSchema) {
+      var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
         val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
         val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
-        typeChangeInfos = 
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
+
         hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
mergedSchema.json)
+
+        
SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), 
mergedInternalSchema)
+      } else {
+        new java.util.HashMap()
       }
+
       val hadoopAttemptContext =
         new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
 

Reply via email to