This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.11-0-apr21-5378-patched in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 12b9b783b9c74519044200592d897bf90efcf3ad Author: Alexey Kudinkin <[email protected]> AuthorDate: Thu Apr 21 13:00:23 2022 -0700 Hardening the code --- .../parquet/Spark32HoodieParquetFileFormat.scala | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 85f8918137..7135f19e95 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -25,9 +25,9 @@ import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair -import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -220,13 +220,17 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // Clone new conf val hadoopAttemptConf = new Configuration(broadcastedHadoopConf.value.value) - var typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = new java.util.HashMap() - if (shouldUseInternalSchema) { + val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) - typeChangeInfos = SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + + SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + } else { + new java.util.HashMap() } + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopAttemptConf, attemptId) @@ -395,6 +399,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo object Spark32HoodieParquetFileFormat { + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createParquetFilters(args: Any*): ParquetFilters = { // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; @@ -404,6 +411,9 @@ object Spark32HoodieParquetFileFormat { .asInstanceOf[ParquetFilters] } + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createParquetReadSupport(args: Any*): ParquetReadSupport = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; @@ -413,6 +423,9 @@ object Spark32HoodieParquetFileFormat { .asInstanceOf[ParquetReadSupport] } + /** + * NOTE: This method is specific to Spark 3.2.0 + */ private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class;
