wombatu-kun commented on code in PR #18923:
URL: https://github.com/apache/hudi/pull/18923#discussion_r3408848761
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -96,29 +105,46 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
})
}
- // Aligns log-block records with the PushVariantIntoScan-projected variant
shape before
- // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
- // _tmp_metadata_row_index) which the merger reads by ordinal — projecting
down to the
- // bare required schema would drop them and the merger would read garbage
offsets.
- override def getLogBlockRecordProjection(
- dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow,
InternalRow]] = {
- val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f =>
f.dataType match {
+ // True only when there is a Spark 4.1 PushVariantIntoScan projection to
apply AND the table is
+ // not using a custom (payload-based) merger. Payload-based tables
round-trip records through
+ // PayloadUpdateProcessor.convertToAvroRecord against a schema that still
types variant fields as
+ // VariantType, so a row already rewritten into the projected struct shape
would be mis-decoded.
+ // Single source of truth for both reader paths (parquet native projection +
avro rewrite).
+ private def shouldProjectVariants: Boolean = {
+ val hasVariantProjection =
sparkRequiredSchema.exists(_.fields.exists(_.dataType match {
case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
case _ => false
}))
- if (!needsProjection) {
- return HOption.empty[JFunction[InternalRow, InternalRow]]()
+ // getRecordMerger() is a Lombok getter over a field initialized to null
(not Option.empty());
+ // it stays null until setRecordMerger() runs during reader init, so the
null guard is required.
+ val merger = getRecordMerger()
+ val isPayloadBased = merger != null && merger.isPresent &&
merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID
+ hasVariantProjection && !isPayloadBased
+ }
+
+ // Aligns avro log-block records with the PushVariantIntoScan-projected
variant shape before
+ // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+ // _tmp_metadata_row_index) which the merger reads by ordinal — projecting
down to the bare
+ // required schema would drop them and the merger would read garbage
offsets. Parquet log blocks
+ // project natively in getFileRecordIterator, so only the avro log reader
calls this hook.
+ override def projectLogBlockRecords(
+ recordIterator: ClosableIterator[InternalRow],
+ dataBlockSchema: HoodieSchema): ClosableIterator[InternalRow] = {
+ if (!shouldProjectVariants) {
+ return recordIterator
}
val req = sparkRequiredSchema.get
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]