vamshikrishnakyatham commented on code in PR #14003:
URL: https://github.com/apache/hudi/pull/14003#discussion_r2393106170
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala:
##########
@@ -286,16 +293,57 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
}
}
- private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant]):
DataFrame = {
+ private def fullTableScanDataFrame(commitsToFilter: List[HoodieInstant],
+ broadcastTimeMap:
org.apache.spark.broadcast.Broadcast[Map[String, String]]): DataFrame = {
val commitTimesToFilter = commitsToFilter.map(_.requestedTime)
val hudiDF = sqlContext.read
.format("hudi_v1")
- .schema(usedSchema)
+ .schema(schema)
.load(basePath.toString)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToFilter:
_*))
- // schema enforcement does not happen in above spark.read with hudi. hence
selecting explicitly w/ right column order
- val fieldNames = usedSchema.fieldNames
- hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val fieldNames = schema.fieldNames
+ val selectedDf = hudiDF.select(fieldNames.head, fieldNames.tail: _*)
+ val transformedRDD = addCompletionTimeColumn(selectedDf.rdd,
broadcastTimeMap)
+ sqlContext.createDataFrame(transformedRDD, schema)
Review Comment:
@yihua For `MergeOnReadIncrementalRelationV2`, it's not straightforward to
handle virtual schema columns because:
1. buildScan is final in the parent HoodieBaseRelation - we cannot
override it to filter out virtual columns before they reach projectSchema
2. The flow is locked:
- Spark calls buildScan(requiredColumns, filters)
- buildScan immediately calls projectSchema(avroSchema, requiredColumns)
at line 367
- projectSchema tries to look up each column in the Avro schema's field
map at line 819
- Virtual columns don't exist in the Avro schema → NoSuchElementException
3. We can't intercept early enough:
- updatePrunedDataSchema is called during planning, but doesn't affect
the requiredColumns array passed to buildScan
- composeRDD is called too late - after projectSchema has already failed
- appendMandatoryColumns is private final - can't override
4. The only solution for us would be to remove final from buildScan in
HoodieBaseRelation (which was mentioned NOT to) so we can override it to filter
virtual columns before calling the parent implementation, then wrap the
resulting RDD to append the virtual column values.
Without removing final, there's no interception point between Spark passing
the columns and projectSchema validating them against the physical schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]