This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ae095b9a56ef2a25b8bb75406a51354b9f4ee13f Author: Alexey Kudinkin <[email protected]> AuthorDate: Thu Apr 21 13:01:08 2022 -0700 Replicating to Spark 3.1 --- .../datasources/parquet/Spark31HoodieParquetFileFormat.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index 30e7b9c78a..e99850bef0 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -154,8 +154,8 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) - val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val fileSchema = if (shouldUseInternalSchema) { + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) } else { @@ -223,13 +223,17 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // Clone new conf val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (shouldUseInternalSchema) { + var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + + SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + } else { + new java.util.HashMap() } + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopAttemptConf, attemptId)
