yihua commented on code in PR #11770:
URL: https://github.com/apache/hudi/pull/11770#discussion_r1720392755


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     //dataSchema is not always right due to spark bugs
-    val partitionColumns = partitionSchema.fieldNames
-    val preCombineField = 
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
-    val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f 
=> !partitionColumns.contains(f.name)
-      || preCombineField.equals(f.name)))
+    val dataSchema = tableSchema.structTypeSchema
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
     setSchemaEvolutionConfigs(augmentedStorageConf)
-    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
+    val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = 
partitionSchema.fields.toSeq.zipWithIndex.filter(p => 
!mandatoryFields.contains(p._1.name)).unzip
+    //remainingPartitionSchema: the schema of the partition cols we want to 
append the value instead of reading from the file
+    val remainingPartitionSchema = StructType(remainingPartitionSchemaArr)
+    //fixedPartitionIndexes: index positions of the remainingPartitionSchema 
fields in partitionSchema
+    val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
+    //requestedSchema: schema that we want fg reader to output to us

Review Comment:
   ```suggestion
       // schema that we want fg reader to output to us
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     //dataSchema is not always right due to spark bugs
-    val partitionColumns = partitionSchema.fieldNames
-    val preCombineField = 
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
-    val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f 
=> !partitionColumns.contains(f.name)
-      || preCombineField.equals(f.name)))
+    val dataSchema = tableSchema.structTypeSchema

Review Comment:
   `dataSchema is not always right due to spark bugs`: is this already fixed by 
Spark 3.3 and above?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -154,18 +183,22 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
               // Append partition values to rows and project to output schema
               appendPartitionAndProject(
                 reader.getClosableIterator,
-                requiredSchema,
-                partitionSchema,
+                requestedSchema,
+                remainingPartitionSchema,

Review Comment:
   One related question I have: does excluding timestamp-typed partition 
columns from the partition schema cause Spark to skip optimizations like 
dynamic partition pruning?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     //dataSchema is not always right due to spark bugs
-    val partitionColumns = partitionSchema.fieldNames
-    val preCombineField = 
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
-    val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f 
=> !partitionColumns.contains(f.name)
-      || preCombineField.equals(f.name)))
+    val dataSchema = tableSchema.structTypeSchema
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
     setSchemaEvolutionConfigs(augmentedStorageConf)
-    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
+    val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = 
partitionSchema.fields.toSeq.zipWithIndex.filter(p => 
!mandatoryFields.contains(p._1.name)).unzip
+    //remainingPartitionSchema: the schema of the partition cols we want to 
append the value instead of reading from the file

Review Comment:
   ```suggestion
       // the schema of the partition cols we want to append the value instead 
of reading from the file
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -87,6 +91,27 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     supportBatchResult
   }
 
+  //for partition columns that we read from the file, we don't want them to be 
constant column vectors so we
+  //modify the vector types in this scenario
+  override def vectorTypes(requiredSchema: StructType,
+                           partitionSchema: StructType,

Review Comment:
   In the partition column is in `TimestampType`, should it be automatically 
translated to use the right vector type?  Or the partition schema we prepare 
does not have the correct type?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                                               options: Map[String, String],
                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
     //dataSchema is not always right due to spark bugs
-    val partitionColumns = partitionSchema.fieldNames
-    val preCombineField = 
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
-    val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f 
=> !partitionColumns.contains(f.name)
-      || preCombineField.equals(f.name)))
+    val dataSchema = tableSchema.structTypeSchema
     val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
     val augmentedStorageConf = new 
HadoopStorageConfiguration(hadoopConf).getInline
     setSchemaEvolutionConfigs(augmentedStorageConf)
-    val requestedAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, 
sanitizedTableName)
+    val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = 
partitionSchema.fields.toSeq.zipWithIndex.filter(p => 
!mandatoryFields.contains(p._1.name)).unzip
+    //remainingPartitionSchema: the schema of the partition cols we want to 
append the value instead of reading from the file
+    val remainingPartitionSchema = StructType(remainingPartitionSchemaArr)
+    //fixedPartitionIndexes: index positions of the remainingPartitionSchema 
fields in partitionSchema

Review Comment:
   ```suggestion
       // index positions of the remainingPartitionSchema fields in 
partitionSchema
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to