alexeykudinkin commented on code in PR #5708:
URL: https://github.com/apache/hudi/pull/5708#discussion_r927259929
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -564,42 +538,57 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
// we have to eagerly initialize all of the readers even though only
one specific to the type
// of the file being read will be used. This is required to avoid
serialization of the whole
// relation (containing file-index for ex) and passing it to the
executor
- val reader = tableBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in
the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
+ val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
+ tableBaseFileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ (
+ HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = spark,
+ dataSchema = dataSchema.structTypeSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredSchema.structTypeSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf,
+ // We're delegating to Spark to append partition values to every
row only in cases
+ // when these corresponding partition-values are not persisted
w/in the data file itself
+ appendPartitionValues =
shouldExtractPartitionValuesFromPartitionPath
+ ),
+ // Since partition values by default are omitted, and not
persisted w/in data-files by Spark,
+ // data-file readers (such as [[ParquetFileFormat]]) have to
inject partition values while reading
+ // the data. As such, actual full schema produced by such reader
is composed of
+ // a) Prepended partition column values
+ // b) Data-file schema (projected or not)
+ StructType(partitionSchema.fields ++
requiredSchema.structTypeSchema.fields)
Review Comment:
It was actually a typo -- partition columns are actually appended (Spark's
ParquetFileFormat has it incorrectly
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L237)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -564,42 +538,57 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
// we have to eagerly initialize all of the readers even though only
one specific to the type
// of the file being read will be used. This is required to avoid
serialization of the whole
// relation (containing file-index for ex) and passing it to the
executor
- val reader = tableBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in
the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
+ val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
+ tableBaseFileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ (
+ HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = spark,
+ dataSchema = dataSchema.structTypeSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredSchema.structTypeSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf,
+ // We're delegating to Spark to append partition values to every
row only in cases
+ // when these corresponding partition-values are not persisted
w/in the data file itself
+ appendPartitionValues =
shouldExtractPartitionValuesFromPartitionPath
+ ),
+ // Since partition values by default are omitted, and not
persisted w/in data-files by Spark,
+ // data-file readers (such as [[ParquetFileFormat]]) have to
inject partition values while reading
+ // the data. As such, actual full schema produced by such reader
is composed of
+ // a) Prepended partition column values
+ // b) Data-file schema (projected or not)
+ StructType(partitionSchema.fields ++
requiredSchema.structTypeSchema.fields)
Review Comment:
It was actually a typo -- partition columns are actually appended (Spark's
ParquetFileFormat has it incorrectly
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L237))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]