alexeykudinkin commented on code in PR #5364:
URL: https://github.com/apache/hudi/pull/5364#discussion_r856510393


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -223,56 +228,62 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
-      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in
-      // data files.
-      StructType(partitionColumns.map(StructField(_, StringType)))
-    } else {
-      StructType(Nil)
-    }
 
-    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
-    val dataSchema = if (dropPartitionColumnsWhenWrite) {
-      val dataStructType = StructType(tableStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        dataStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, 
nullable = false, "record").toString()
-      )
-    } else {
-      tableSchema
-    }
-    val requiredSchema = if (dropPartitionColumnsWhenWrite) {
-      val requiredStructType = StructType(requiredStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
-      HoodieTableSchema(
-        requiredStructType,
-        sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, 
nullable = false, "record").toString()
-      )
+    val tableAvroSchemaStr =
+      if (internalSchema.isEmptySchema) tableAvroSchema.toString
+      else AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString
+
+    val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, 
internalSchema)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, requiredInternalSchema)
+
+    // Since schema requested by the caller might contain partition columns, 
we might need to
+    // prune it, removing all partition columns from it in case these columns 
are not persisted
+    // in the data files
+    //
+    // NOTE: This partition schema is only relevant to file reader to be able 
to embed
+    //       values of partition columns (hereafter referred to as partition 
values) encoded into
+    //       the partition path, and omitted from the data file, back into 
fetched rows;
+    //       Note that, by default, partition columns are not omitted 
therefore specifying
+    //       partition schema for reader is not required
+    val (partitionSchema, dataSchema, prunedRequiredSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    if (fileSplits.isEmpty) {
+      sparkSession.sparkContext.emptyRDD
     } else {
-      HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, 
requiredInternalSchema)
+      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, 
prunedRequiredSchema, filters)
+
+      // NOTE: In case when partition columns have been pruned from the 
required schema, we have to project
+      //       the rows from the pruned schema back into the one expected by 
the caller
+      val projectedRDD = if (prunedRequiredSchema.structTypeSchema != 
requiredSchema.structTypeSchema) {
+        rdd.mapPartitions { it =>
+          val fullPrunedSchema = 
StructType(prunedRequiredSchema.structTypeSchema.fields ++ 
partitionSchema.fields)
+          val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, 
requiredSchema.structTypeSchema)

Review Comment:
   @YannByron this is the problem you're hitting with mandatory columns -- when 
you're filtering out partition columns from the schema,  you actually 
re-ordered the columns relative to what caller (Spark) was expecting and it was 
simply projecting schema assuming that BaseRelation will return rows adhering 
to the schema, while it was returning it w/ columns reordered (where partition 
columns were appended at the end).
   
   Proper fix for that was to do projection here back into the schema that 
caller expects



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -114,16 +114,37 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
    *       rule; you can find more details in HUDI-3896)
    */
   def toHadoopFsRelation: HadoopFsRelation = {
+    // We're delegating to Spark to append partition values to every row only 
in cases
+    // when these corresponding partition-values are not persisted w/in the 
data file itself
+    val shouldAppendPartitionColumns = omitPartitionColumnsInFile
+
     val (tableFileFormat, formatClassName) = 
metaClient.getTableConfig.getBaseFileFormat match {
-      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.PARQUET => 
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, 
"hoodie-parquet")
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
     }
 
     if (globPaths.isEmpty) {
+      // NOTE: There are currently 2 ways partition values could be fetched:
+      //          - Source columns (producing the values used for physical 
partitioning) will be read
+      //          from the data file
+      //          - Values parsed from the actual partition pat would be 
appended to the final dataset

Review Comment:
   Addressed in a follow-up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to