YannByron commented on code in PR #5708:
URL: https://github.com/apache/hudi/pull/5708#discussion_r928123957
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
// we have to eagerly initialize all of the readers even though only
one specific to the type
// of the file being read will be used. This is required to avoid
serialization of the whole
// relation (containing file-index for ex) and passing it to the
executor
- val reader = tableBaseFileFormat match {
- case HoodieFileFormat.PARQUET =>
- HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in
the data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
+ val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType)
=
+ tableBaseFileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = spark,
+ dataSchema = dataSchema.structTypeSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredDataSchema.structTypeSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf,
+ // We're delegating to Spark to append partition values to every
row only in cases
+ // when these corresponding partition-values are not persisted
w/in the data file itself
+ appendPartitionValues =
shouldExtractPartitionValuesFromPartitionPath
+ )
+ // Since partition values by default are omitted, and not persisted
w/in data-files by Spark,
+ // data-file readers (such as [[ParquetFileFormat]]) have to inject
partition values while reading
+ // the data. As such, actual full schema produced by such reader is
composed of
+ // a) Data-file schema (projected or not)
+ // b) Appended partition column values
+ val readerSchema =
StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
+
+ (parquetReader, readerSchema)
case HoodieFileFormat.HFILE =>
- createHFileReader(
+ val hfileReader = createHFileReader(
spark = spark,
dataSchema = dataSchema,
- requiredSchema = requiredSchema,
+ requiredDataSchema = requiredDataSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)
+ (hfileReader, requiredDataSchema.structTypeSchema)
+
case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($tableBaseFileFormat)")
}
- partitionedFile => {
- val extension = FSUtils.getFileExtension(partitionedFile.filePath)
- if (tableBaseFileFormat.getFileExtension.equals(extension)) {
- reader.apply(partitionedFile)
- } else {
- throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
Review Comment:
can we move the judgement into
`HoodieDataSourceHelper.buildHoodieParquetReader` and `createHFileReader`
separately?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]