yihua commented on code in PR #11770:
URL: https://github.com/apache/hudi/pull/11770#discussion_r1720392755
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
- val partitionColumns = partitionSchema.fieldNames
- val preCombineField =
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
- val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f
=> !partitionColumns.contains(f.name)
- || preCombineField.equals(f.name)))
+ val dataSchema = tableSchema.structTypeSchema
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
- val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
+ val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) =
partitionSchema.fields.toSeq.zipWithIndex.filter(p =>
!mandatoryFields.contains(p._1.name)).unzip
+ //remainingPartitionSchema: the schema of the partition cols we want to
append the value instead of reading from the file
+ val remainingPartitionSchema = StructType(remainingPartitionSchemaArr)
+ //fixedPartitionIndexes: index positions of the remainingPartitionSchema
fields in partitionSchema
+ val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet
+ //requestedSchema: schema that we want fg reader to output to us
Review Comment:
```suggestion
// schema that we want fg reader to output to us
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
- val partitionColumns = partitionSchema.fieldNames
- val preCombineField =
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
- val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f
=> !partitionColumns.contains(f.name)
- || preCombineField.equals(f.name)))
+ val dataSchema = tableSchema.structTypeSchema
Review Comment:
`dataSchema is not always right due to spark bugs`: is this already fixed by
Spark 3.3 and above?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -154,18 +183,22 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
- requiredSchema,
- partitionSchema,
+ requestedSchema,
+ remainingPartitionSchema,
Review Comment:
One related question I have: does excluding timestamp-typed partition
columns from the partition schema cause Spark to skip optimizations like
dynamic partition pruning?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
- val partitionColumns = partitionSchema.fieldNames
- val preCombineField =
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
- val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f
=> !partitionColumns.contains(f.name)
- || preCombineField.equals(f.name)))
+ val dataSchema = tableSchema.structTypeSchema
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
- val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
+ val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) =
partitionSchema.fields.toSeq.zipWithIndex.filter(p =>
!mandatoryFields.contains(p._1.name)).unzip
+ //remainingPartitionSchema: the schema of the partition cols we want to
append the value instead of reading from the file
Review Comment:
```suggestion
// the schema of the partition cols we want to append the value instead
of reading from the file
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -87,6 +91,27 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
supportBatchResult
}
+ //for partition columns that we read from the file, we don't want them to be
constant column vectors so we
+ //modify the vector types in this scenario
+ override def vectorTypes(requiredSchema: StructType,
+ partitionSchema: StructType,
Review Comment:
In the partition column is in `TimestampType`, should it be automatically
translated to use the right vector type? Or the partition schema we prepare
does not have the correct type?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala:
##########
@@ -105,15 +130,19 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
- val partitionColumns = partitionSchema.fieldNames
- val preCombineField =
options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
- val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f
=> !partitionColumns.contains(f.name)
- || preCombineField.equals(f.name)))
+ val dataSchema = tableSchema.structTypeSchema
val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new
HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
- val requestedAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema,
sanitizedTableName)
+ val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) =
partitionSchema.fields.toSeq.zipWithIndex.filter(p =>
!mandatoryFields.contains(p._1.name)).unzip
+ //remainingPartitionSchema: the schema of the partition cols we want to
append the value instead of reading from the file
+ val remainingPartitionSchema = StructType(remainingPartitionSchemaArr)
+ //fixedPartitionIndexes: index positions of the remainingPartitionSchema
fields in partitionSchema
Review Comment:
```suggestion
// index positions of the remainingPartitionSchema fields in
partitionSchema
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]