alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r856510393
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
- val partitionSchema = if (dropPartitionColumnsWhenWrite) {
- // when hoodie.datasource.write.drop.partition.columns is true,
partition columns can't be persisted in
- // data files.
- StructType(partitionColumns.map(StructField(_, StringType)))
- } else {
- StructType(Nil)
- }
- val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
- val dataSchema = if (dropPartitionColumnsWhenWrite) {
- val dataStructType = StructType(tableStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
- HoodieTableSchema(
- dataStructType,
- sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType,
nullable = false, "record").toString()
- )
- } else {
- tableSchema
- }
- val requiredSchema = if (dropPartitionColumnsWhenWrite) {
- val requiredStructType = StructType(requiredStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
- HoodieTableSchema(
- requiredStructType,
- sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType,
nullable = false, "record").toString()
- )
+ val tableAvroSchemaStr =
+ if (internalSchema.isEmptySchema) tableAvroSchema.toString
+ else AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString
+
+ val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr,
internalSchema)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString, requiredInternalSchema)
+
+ // Since schema requested by the caller might contain partition columns,
we might need to
+ // prune it, removing all partition columns from it in case these columns
are not persisted
+ // in the data files
+ //
+ // NOTE: This partition schema is only relevant to file reader to be able
to embed
+ // values of partition columns (hereafter referred to as partition
values) encoded into
+ // the partition path, and omitted from the data file, back into
fetched rows;
+ // Note that, by default, partition columns are not omitted
therefore specifying
+ // partition schema for reader is not required
+ val (partitionSchema, dataSchema, prunedRequiredSchema) =
+ tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+ if (fileSplits.isEmpty) {
+ sparkSession.sparkContext.emptyRDD
} else {
- HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString,
requiredInternalSchema)
+ val rdd = composeRDD(fileSplits, partitionSchema, dataSchema,
prunedRequiredSchema, filters)
+
+ // NOTE: In case when partition columns have been pruned from the
required schema, we have to project
+ // the rows from the pruned schema back into the one expected by
the caller
+ val projectedRDD = if (prunedRequiredSchema.structTypeSchema !=
requiredSchema.structTypeSchema) {
+ rdd.mapPartitions { it =>
+ val fullPrunedSchema =
StructType(prunedRequiredSchema.structTypeSchema.fields ++
partitionSchema.fields)
+ val unsafeProjection = generateUnsafeProjection(fullPrunedSchema,
requiredSchema.structTypeSchema)
Review Comment:
@YannByron this is the problem you're hitting with mandatory columns -- when
you're filtering out partition columns from the schema, you actually
re-ordered the columns relative to what caller (Spark) was expecting and it was
simply projecting schema assuming that BaseRelation will return rows adhering
to the schema, while it was returning it w/ columns reordered (where partition
columns were appended at the end).
Proper fix for that was to do projection here back into the schema that
caller expects
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,37 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
+ // We're delegating to Spark to append partition values to every row only
in cases
+ // when these corresponding partition-values are not persisted w/in the
data file itself
+ val shouldAppendPartitionColumns = omitPartitionColumnsInFile
+
val (tableFileFormat, formatClassName) =
metaClient.getTableConfig.getBaseFileFormat match {
- case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+ case HoodieFileFormat.PARQUET =>
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get,
"hoodie-parquet")
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
if (globPaths.isEmpty) {
+ // NOTE: There are currently 2 ways partition values could be fetched:
+ // - Source columns (producing the values used for physical
partitioning) will be read
+ // from the data file
+ // - Values parsed from the actual partition pat would be
appended to the final dataset
Review Comment:
Addressed in a follow-up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]