wombatu-kun commented on code in PR #18923:
URL: https://github.com/apache/hudi/pull/18923#discussion_r3408848761


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -96,29 +105,46 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
     })
   }
 
-  // Aligns log-block records with the PushVariantIntoScan-projected variant 
shape before
-  // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
-  // _tmp_metadata_row_index) which the merger reads by ordinal — projecting 
down to the
-  // bare required schema would drop them and the merger would read garbage 
offsets.
-  override def getLogBlockRecordProjection(
-      dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, 
InternalRow]] = {
-    val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => 
f.dataType match {
+  // True only when there is a Spark 4.1 PushVariantIntoScan projection to 
apply AND the table is
+  // not using a custom (payload-based) merger. Payload-based tables 
round-trip records through
+  // PayloadUpdateProcessor.convertToAvroRecord against a schema that still 
types variant fields as
+  // VariantType, so a row already rewritten into the projected struct shape 
would be mis-decoded.
+  // Single source of truth for both reader paths (parquet native projection + 
avro rewrite).
+  private def shouldProjectVariants: Boolean = {
+    val hasVariantProjection = 
sparkRequiredSchema.exists(_.fields.exists(_.dataType match {
       case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
       case _ => false
     }))
-    if (!needsProjection) {
-      return HOption.empty[JFunction[InternalRow, InternalRow]]()
+    // getRecordMerger() is a Lombok getter over a field initialized to null 
(not Option.empty());
+    // it stays null until setRecordMerger() runs during reader init, so the 
null guard is required.
+    val merger = getRecordMerger()
+    val isPayloadBased = merger != null && merger.isPresent && 
merger.get.getMergingStrategy == PAYLOAD_BASED_MERGE_STRATEGY_UUID
+    hasVariantProjection && !isPayloadBased
+  }
+
+  // Aligns avro log-block records with the PushVariantIntoScan-projected 
variant shape before
+  // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+  // _tmp_metadata_row_index) which the merger reads by ordinal — projecting 
down to the bare
+  // required schema would drop them and the merger would read garbage 
offsets. Parquet log blocks
+  // project natively in getFileRecordIterator, so only the avro log reader 
calls this hook.
+  override def projectLogBlockRecords(
+      recordIterator: ClosableIterator[InternalRow],
+      dataBlockSchema: HoodieSchema): ClosableIterator[InternalRow] = {
+    if (!shouldProjectVariants) {
+      return recordIterator
     }
     val req = sparkRequiredSchema.get

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to