This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b26f2b4e2ba fix(variant): align Spark 4.1 MOR merge with 
PushVariantIntoScan and restore Spark 4.0 reads (#18674)
1b26f2b4e2ba is described below

commit 1b26f2b4e2ba4b2dac108dc845c90daaf6e830c3
Author: voonhous <[email protected]>
AuthorDate: Fri May 15 05:09:27 2026 +0800

    fix(variant): align Spark 4.1 MOR merge with PushVariantIntoScan and 
restore Spark 4.0 reads (#18674)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/io/storage/HoodieSparkParquetReader.java  |  11 +-
 .../SparkFileFormatInternalRowReaderContext.scala  |  76 +++++++++++++-
 .../sql/avro/HoodieSparkSchemaConverters.scala     |  20 +++-
 .../parquet/HoodieParquetReadSupport.scala         |  48 +--------
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  40 ++++++-
 .../parquet/TestHoodieParquetReadSupport.scala     |  34 ------
 .../hudi/common/engine/HoodieReaderContext.java    |  10 ++
 .../table/read/buffer/FileGroupRecordBuffer.java   |  27 ++++-
 .../buffer/PositionBasedFileGroupRecordBuffer.java |  10 +-
 .../avro/AvroSchemaConverterWithTimestampNTZ.java  |  11 +-
 .../HoodieFileGroupReaderBasedFileFormat.scala     |  23 ++++-
 .../sql/hudi/dml/schema/TestVariantDataType.scala  |  10 --
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |  19 +++-
 .../apache/spark/sql/adapter/Spark4_0Adapter.scala |  16 ++-
 .../parquet/Spark40HoodieParquetReadSupport.scala  | 115 +++++++++++++++++++++
 .../Spark40LegacyHoodieParquetFileFormat.scala     |   4 +-
 .../datasources/parquet/Spark40ParquetReader.scala |   2 +-
 .../TestSpark40HoodieParquetReadSupport.scala      |  59 +++++++++++
 .../apache/spark/sql/adapter/Spark4_1Adapter.scala |  59 ++++++++++-
 19 files changed, 474 insertions(+), 120 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 80ed9be8420e..d72409064a09 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -187,10 +187,13 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
                 
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
 sqlConf.parquetRecordFilterEnabled())));
           });
     }
-    ParquetReader<InternalRow> reader = ParquetReader.builder(new 
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
-                rebaseDateSpec,
-                
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), 
messageSchema),
-            new Path(path.toUri()))
+    // Via SparkAdapter so Spark 4.0 picks up its variant-reordering 
ReadSupport subclass
+    // (#18334); constructing the base class here would MALFORMED_VARIANT on 
Spark 4.0.
+    HoodieParquetReadSupport readSupport = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetReadSupport(
+        Option$.MODULE$.empty(), true, true,
+        rebaseDateSpec,
+        messageSchema);
+    ParquetReader<InternalRow> reader = ParquetReader.builder(readSupport, new 
Path(path.toUri()))
         .withConf(storage.getConf().unwrapAs(Configuration.class))
         .build();
     UnsafeProjection projection = evolution.generateUnsafeProjection();
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 92b963f68390..edfcc44e9aa2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.{HoodieFileFormat, 
HoodieRecord}
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
 import org.apache.hudi.common.table.HoodieTableConfig
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator, Pair => HPair}
 import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader, VectorConversionUtils}
@@ -38,11 +39,14 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
SparkColumnarFileReader}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{ArrayType, ByteType, DoubleType, FloatType, 
LongType, MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 
+import java.util.function.{Function => JFunction}
+
 import scala.collection.JavaConverters._
 
 /**
@@ -61,14 +65,62 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
                                               filters: Seq[Filter],
                                               requiredFilters: Seq[Filter],
                                               storageConfiguration: 
StorageConfiguration[_],
-                                              tableConfig: HoodieTableConfig)
+                                              tableConfig: HoodieTableConfig,
+                                              sparkRequiredSchema: 
Option[StructType] = None)
   extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, 
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
+
+  // Java-friendly auxiliary constructor (Scala default args don't generate 
matching Java overloads).
+  def this(baseFileReader: SparkColumnarFileReader,
+           filters: Seq[Filter],
+           requiredFilters: Seq[Filter],
+           storageConfiguration: StorageConfiguration[_],
+           tableConfig: HoodieTableConfig) =
+    this(baseFileReader, filters, requiredFilters, storageConfiguration, 
tableConfig, None)
+
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private lazy val recordKeyFields = 
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
   private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_, 
recordKeyFields)) ++ requiredFilters
   private lazy val allFilters = filters ++ requiredFilters
 
+  // For each field of `target`, replace its dataType with the matching 
field's projected
+  // variant struct from `source` (when present). Non-matching fields pass 
through.
+  private def overlayVariantProjections(target: StructType, source: 
StructType): StructType = {
+    StructType(target.fields.map { f =>
+      SparkFileFormatInternalRowReaderContext.findFieldByName(source, 
f.name).map(_.dataType) match {
+        case Some(projStruct: StructType) if 
sparkAdapter.isVariantProjectionStruct(projStruct) =>
+          f.copy(dataType = projStruct)
+        case _ => f
+      }
+    })
+  }
+
+  // Aligns log-block records with the PushVariantIntoScan-projected variant 
shape before
+  // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+  // _tmp_metadata_row_index) which the merger reads by ordinal — projecting 
down to the
+  // bare required schema would drop them and the merger would read garbage 
offsets.
+  override def getLogBlockRecordProjection(
+      dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, 
InternalRow]] = {
+    val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => 
f.dataType match {
+      case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
+      case _ => false
+    }))
+    if (!needsProjection) {
+      return HOption.empty[JFunction[InternalRow, InternalRow]]()
+    }
+    val req = sparkRequiredSchema.get
+    val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema)
+    val targetStruct = overlayVariantProjections(dataStruct, req)
+    sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match {
+      case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] {
+        // .copy() because the buffer stores rows into ExternalSpillableMap and
+        // UnsafeProjection reuses a single output buffer.
+        override def apply(r: InternalRow): InternalRow = p(r).copy()
+      })
+      case None => HOption.empty[JFunction[InternalRow, InternalRow]]()
+    }
+  }
+
   override def getFileRecordIterator(filePath: StoragePath,
                                      start: Long,
                                      length: Long,
@@ -79,7 +131,14 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
     if (hasRowIndexField) {
       assert(getRecordContext.supportsParquetRowIndex())
     }
-    val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+    // Use the engine's augmented requiredSchema (includes merger metadata 
cols the merger
+    // reads from base rows), but overlay the projected variant shape from 
sparkRequiredSchema
+    // so parquet-mr's PushVariantIntoScan kicks in (HoodieSchema collapses 
the projected
+    // struct back to VariantType, dropping the VariantMetadata parquet-mr 
looks for).
+    val structType = sparkRequiredSchema match {
+      case Some(sparkReq) => 
overlayVariantProjections(HoodieInternalRowUtils.getCachedSchema(requiredSchema),
 sparkReq)
+      case None => HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+    }
 
     // Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY, so the reader needs 
BinaryType
     // and we decode back to ArrayType below. Lance returns ArrayType 
natively, so skip
@@ -100,8 +159,10 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
     val (readSchema, readFilters) = 
getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
     if (FSUtils.isLogFile(filePath)) {
       // NOTE: now only primary key based filtering is supported for log files
+      // Variant alignment happens later via getLogBlockRecordProjection in 
the merge buffer.
       new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
-        
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, 
readFilters.asJava).asInstanceOf[ClosableIterator[InternalRow]]
+        
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, 
readFilters.asJava)
+        .asInstanceOf[ClosableIterator[InternalRow]]
     } else {
       // partition value is empty because the spark parquet reader will append 
the partition columns to
       // each row if they are given. That is the only usage of the partition 
values in the reader.
@@ -274,6 +335,15 @@ class 
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
 }
 
 object SparkFileFormatInternalRowReaderContext {
+  /** Look up a field by name, honoring `spark.sql.caseSensitive`. */
+  private[hudi] def findFieldByName(schema: StructType, name: String): 
Option[StructField] = {
+    if (SQLConf.get.caseSensitiveAnalysis) {
+      schema.fields.find(_.name == name)
+    } else {
+      schema.fields.find(_.name.equalsIgnoreCase(name))
+    }
+  }
+
   // From "namedExpressions.scala": Used to construct to record position field 
metadata.
   private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
   private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index f634b773f7ea..c1c3fe1f6837 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -18,7 +18,7 @@
 
 package org.apache.spark.sql.avro
 
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
 import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, 
HoodieSchemaField, HoodieSchemaType}
 import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
 import org.apache.hudi.internal.schema.HoodieSchemaException
@@ -190,6 +190,12 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
         isCanonicalVariantStruct(variantStruct) =>
         HoodieSchema.createVariant(recordName, nameSpace, null)
 
+      // PushVariantIntoScan (Spark 4.1+) rewrites Variant to a struct of 
extractions; map
+      // it back to a regular HoodieSchema Variant. parquet-mr does the 
projection natively
+      // from the Spark required schema's VariantMetadata.
+      case projectedVariant: StructType if 
isSparkVariantProjectionStruct(projectedVariant) =>
+        HoodieSchema.createVariant(recordName, nameSpace, null)
+
       case st: StructType =>
         val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" 
else recordName
 
@@ -514,6 +520,18 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
       }
   }
 
+  /**
+   * Detects a Spark 4.1 PushVariantIntoScan-projected struct. Short-circuits 
before consulting
+   * the version-specific SparkAdapter so shared-module unit tests (whose 
classpath lacks any
+   * SparkXAdapter) don't trigger an adapter-load failure on plain StructType 
conversions.
+   */
+  private def isSparkVariantProjectionStruct(st: StructType): Boolean = {
+    if (!HoodieSparkUtils.gteqSpark4_1) return false
+    if (st.fields.forall(_.metadata == Metadata.empty)) return false
+    try sparkAdapter.isVariantProjectionStruct(st)
+    catch { case _: NoClassDefFoundError | _: ClassNotFoundException => false }
+  }
+
   private def sparkTypeForVectorElementType(
                                              elementType: 
HoodieSchema.Vector.VectorElementType): DataType = elementType match {
     case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
index 5d77bd2fa3d7..c63ec0eef9e4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils
 
 import org.apache.parquet.hadoop.api.InitContext
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, 
SchemaRepair, Type, Types}
+import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type, 
Types}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 
 import java.time.ZoneId
@@ -50,15 +50,7 @@ class HoodieParquetReadSupport(
       readContext.getRequestedSchema
     }
     val trimmedParquetSchema = 
HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, 
context.getFileSchema)
-    // TODO: Remove this workaround once Spark is bumped to 4.1+, which reads 
variant fields by
-    //  name via SPARK-54410. Spark 4.0.x's ParquetUnshreddedVariantConverter 
builds its converters
-    //  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the Parquet
-    //  schema has [metadata, value] order (per spec), the positional mismatch 
causes
-    //  MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, 
metadata] in the
-    //  requested schema. parquet-mr reconciles requested vs file schema by 
field name, so bytes
-    //  flow correctly. This is tracked in issue #18334
-    val reorderedSchema = 
HoodieParquetReadSupport.reorderVariantFields(trimmedParquetSchema)
-    new ReadContext(reorderedSchema, readContext.getReadSupportMetadata)
+    new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
   }
 }
 
@@ -83,42 +75,6 @@ object HoodieParquetReadSupport {
     Types.buildMessage().addFields(trimmedFields: 
_*).named(requestedSchema.getName)
   }
 
-  /**
-   * Reorders variant group fields in the requested schema so that "value" 
precedes "metadata".
-   * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which 
builds its
-   * converters array in hardcoded [value, metadata] order and indexes by 
schema position.
-   * parquet-mr reconciles the requested schema against the file schema by 
field name,
-   * so the correct bytes still flow to the correct converters regardless of 
file order.
-   */
-  def reorderVariantFields(schema: MessageType): MessageType = {
-    val reordered = 
schema.getFields.asScala.map(reorderVariantType).toArray[Type]
-    Types.buildMessage().addFields(reordered: _*).named(schema.getName)
-  }
-
-  private def reorderVariantType(t: Type): Type = {
-    t match {
-      case group: GroupType if isVariantGroup(group) =>
-        // Rebuild with [value, metadata] order for Spark compatibility
-        val valueField = group.getType("value")
-        val metadataField = group.getType("metadata")
-        group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
-      case group: GroupType =>
-        // Recurse into nested groups
-        val children = group.getFields.asScala.map(reorderVariantType).asJava
-        group.withNewFields(children)
-      case _ => t
-    }
-  }
-
-  private def isVariantGroup(group: GroupType): Boolean = {
-    group.containsField("value") &&
-      group.containsField("metadata") &&
-      group.getType("value").isPrimitive &&
-      group.getType("metadata").isPrimitive &&
-      group.getType("value").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY &&
-      group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY
-  }
-
   private def trimParquetType(requestedType: Type, fileType: Type): 
Option[Type] = {
     if (requestedType.equals(fileType)) {
       Some(requestedType)
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 4eb680e1efc7..2934536f4ec9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.storage.StorageConfiguration
 
 import org.apache.hadoop.conf.Configuration
@@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetFilters}
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport, 
ParquetFileFormat, ParquetFilters}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -242,6 +243,21 @@ trait SparkAdapter extends Serializable {
                             options: Map[String, String],
                             hadoopConf: Configuration): 
Option[SparkColumnarFileReader]
 
+  /**
+   * Build the [[HoodieParquetReadSupport]] for a parquet read. Spark 4.0 
overrides to return
+   * its variant-aware subclass (variant group field reorder for the 
positional converter).
+   * int96 rebase mode is fixed to LEGACY (Hudi convention for timestamp 
compatibility).
+   */
+  def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+                               enableVectorizedReader: Boolean,
+                               enableTimestampFieldRepair: Boolean,
+                               datetimeRebaseSpec: RebaseSpec,
+                               tableSchemaOpt: HOption[MessageType])
+      : HoodieParquetReadSupport = {
+    new HoodieParquetReadSupport(convertTz, enableVectorizedReader, 
enableTimestampFieldRepair,
+      datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt)
+  }
+
   /**
    * use new qe execute
    */
@@ -466,6 +482,28 @@ trait SparkAdapter extends Serializable {
    */
   def isVariantShreddingStruct(structType: StructType): Boolean
 
+  /**
+   * Checks if a StructType is the result of Spark 4.1's PushVariantIntoScan 
rewriting — i.e.,
+   * every child field carries `VariantMetadata` describing a pushed-down 
variant extraction.
+   *
+   * Returns false on Spark versions earlier than 4.1 (the rewriting only 
happens there).
+   */
+  def isVariantProjectionStruct(structType: StructType): Boolean = false
+
+  /**
+   * If `sparkRequiredSchema` contains any field that's a Spark 4.1 variant 
projection struct
+   * (i.e., the same-named field in `sparkDataSchema` is `VariantType`), 
returns a row
+   * transformer that takes an InternalRow in the data-schema shape (with full 
variants) and
+   * produces an InternalRow in the required-schema shape (with each variant 
column projected
+   * to its requested struct via VariantGet).
+   *
+   * Used on the MOR log-file path: log records carry the full variant on 
disk, but the merger
+   * expects rows aligned to the post-PushVariantIntoScan required schema. 
Returns None when
+   * there's nothing to project (cheap fast-path for Spark < 4.1 and for 
non-variant queries).
+   */
+  def buildVariantProjector(sparkDataSchema: StructType,
+                            sparkRequiredSchema: StructType): 
Option[InternalRow => InternalRow] = None
+
   /**
    * Generates a shredded Variant schema and marks it with write shredding 
metadata.
    *
diff --git 
a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
 
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
index 6c2ec5b725cd..973454d1b70c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
+++ 
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
@@ -113,38 +113,4 @@ class TestHoodieParquetReadSupport {
         .named("required")
     Assertions.assertEquals(expectedSchema, trimmedSchema)
   }
-
-  /**
-   * Validate that reorderVariantFields does not treat groups as variant when 
the value/metadata
-   * fields fail the type checks in isVariantGroup. Each sub-group exercises a 
different false
-   * branch of the short-circuit && chain (lines 116-119).
-   */
-  @Test
-  def testReorderVariantFields_nonVariantGroupsUnchanged(): Unit = {
-    val schema = Types.buildMessage()
-      // value is non-primitive → line 116 false
-      .addField(Types.requiredGroup()
-        
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value"))
-        .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata"))
-        .named("g1"))
-      // value is primitive, metadata is non-primitive → line 117 false
-      .addField(Types.requiredGroup()
-        .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
-        
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata"))
-        .named("g2"))
-      // both primitive but non-BINARY → line 118 false
-      .addField(Types.requiredGroup()
-        .addField(Types.required(PrimitiveTypeName.INT32).named("value"))
-        .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
-        .named("g3"))
-      // value is BINARY, metadata is non-BINARY primitive → line 119 false
-      .addField(Types.requiredGroup()
-        .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
-        .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
-        .named("g4"))
-      .named("test")
-
-    val result = HoodieParquetReadSupport.reorderVariantFields(schema)
-    Assertions.assertEquals(schema, result)
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 393dc8e789f6..953014802cf2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -56,6 +56,7 @@ import org.apache.hudi.storage.StoragePathInfo;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
 import static 
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
@@ -383,6 +384,15 @@ public abstract class HoodieReaderContext<T> {
                                                             HoodieSchema 
dataRequiredSchema,
                                                             List<Pair<String, 
Object>> requiredPartitionFieldAndValues);
 
+  /**
+   * Optional per-row transformer applied to log-block records before they 
reach the merger.
+   * Engines override this to align records with a projected read schema (e.g. 
Spark 4.1's
+   * PushVariantIntoScan). Default is no projection.
+   */
+  public Option<Function<T, T>> getLogBlockRecordProjection(HoodieSchema 
dataBlockSchema) {
+    return Option.empty();
+  }
+
   public Option<Pair<String, String>> getPayloadClasses(TypedProperties props) 
{
     return getRecordMerger().map(merger -> {
       if 
(merger.getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index a0eef45b137b..702ecf5041e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -199,9 +199,8 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
       } else {
         blockRecordsIterator = 
dataBlock.getEngineRecordIterator(readerContext);
       }
-      Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
-      return Pair.of(new CloseableMappingIterator<>(
-          blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), 
schemaTransformerWithEvolvedSchema.getRight());
+      Pair<Function<T, T>, HoodieSchema> projectedTransformer = 
getProjectedTransformer(dataBlock);
+      return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, 
projectedTransformer.getLeft()), projectedTransformer.getRight());
     } catch (IOException e) {
       throw new HoodieIOException("Failed to deser records from log files ", 
e);
     }
@@ -281,6 +280,28 @@ abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordBuffer<T
     return Pair.of(transformer, evolvedSchema);
   }
 
+  /**
+   * Composes schema evolution then the engine's optional log-block record 
projection
+   * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved 
data-block schema
+   * — the projector preserves field shape, only rewriting variant fields, so 
merger
+   * metadata cols (read by ordinal) stay intact.
+   *
+   * <p>Skipped when a custom payload class is configured: {@code 
PayloadUpdateProcessor}
+   * round-trips through {@code convertToAvroRecord} against a schema that 
still types
+   * variant fields as {@code VariantType}, which would mis-decode rewritten 
rows.
+   */
+  protected Pair<Function<T, T>, HoodieSchema> 
getProjectedTransformer(HoodieDataBlock dataBlock) {
+    Pair<Function<T, T>, HoodieSchema> evolved = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
+    if (payloadClasses.isPresent()) {
+      return evolved;
+    }
+    Option<Function<T, T>> logProjOpt = 
readerContext.getLogBlockRecordProjection(evolved.getRight());
+    if (!logProjOpt.isPresent()) {
+      return evolved;
+    }
+    return Pair.of(evolved.getLeft().andThen(logProjOpt.get()), 
evolved.getRight());
+  }
+
   private static class LogRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
     private final FileGroupRecordBuffer<T> fileGroupRecordBuffer;
     private final Iterator<BufferedRecord<T>> logRecordIterator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 18f9acc43511..73dd5dda5e87 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -126,9 +126,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
           partialUpdateModeOpt);
     }
 
-    Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = 
getSchemaTransformerWithEvolvedSchema(dataBlock);
+    Pair<Function<T, T>, HoodieSchema> projectedTransformer = 
getProjectedTransformer(dataBlock);
 
-    HoodieSchema schema = 
HoodieSchemaCache.intern(schemaTransformerWithEvolvedSchema.getRight());
+    HoodieSchema schema = 
HoodieSchemaCache.intern(projectedTransformer.getRight());
 
     // TODO: Return an iterator that can generate sequence number with the 
record.
     //       Then we can hide this logic into data block.
@@ -144,9 +144,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
         }
 
         long recordPosition = recordPositions.get(recordIndex++);
-        T evolvedNextRecord = 
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
-        boolean isDelete = 
readerContext.getRecordContext().isDeleteRecord(evolvedNextRecord, 
deleteContext);
-        BufferedRecord<T> bufferedRecord = 
BufferedRecords.fromEngineRecord(evolvedNextRecord, schema, 
readerContext.getRecordContext(), orderingFieldNames, isDelete);
+        T projectedNextRecord = 
projectedTransformer.getLeft().apply(nextRecord);
+        boolean isDelete = 
readerContext.getRecordContext().isDeleteRecord(projectedNextRecord, 
deleteContext);
+        BufferedRecord<T> bufferedRecord = 
BufferedRecords.fromEngineRecord(projectedNextRecord, schema, 
readerContext.getRecordContext(), orderingFieldNames, isDelete);
         processNextDataRecord(bufferedRecord, recordPosition);
       }
     }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 8895186fd055..c28a9a83c278 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.ConversionPatterns;
@@ -77,6 +78,7 @@ import static 
org.apache.parquet.schema.Type.Repetition.REPEATED;
  * This was taken from parquet-java 1.13.1 AvroSchemaConverter and modified
  * to support local timestamp types by copying a few methods from 1.14.0 
AvroSchemaConverter.
  */
+@Slf4j
 @SuppressWarnings("all")
 public class AvroSchemaConverterWithTimestampNTZ extends 
HoodieAvroParquetSchemaConverter {
 
@@ -478,7 +480,14 @@ public class AvroSchemaConverterWithTimestampNTZ extends 
HoodieAvroParquetSchema
           public java.util.Optional<HoodieSchema> 
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
             return 
java.util.Optional.of(HoodieSchema.create(HoodieSchemaType.STRING));
           }
-        }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert 
Parquet type " + parquetType));
+        }).orElseGet(() -> {
+          // Unrecognized annotation (e.g., parquet 1.16.0+ 
VariantLogicalTypeAnnotation, not
+          // available on the parquet version we compile against). Fall back 
to record
+          // conversion, correct for the variant binary group (`metadata`, 
`value`) we write.
+          log.debug("Unrecognized parquet LogicalTypeAnnotation '{}' on group 
'{}', falling back to record conversion",
+              logicalTypeAnnotation, parquetGroupType.getName());
+          return convertFields(parquetGroupType.getName(), 
parquetGroupType.getFields(), names);
+        });
       } else {
         // if no original type then it's a record
         return convertFields(parquetGroupType.getName(), 
parquetGroupType.getFields(), names);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 3da22ff8ebe7..de4ffb400d4c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -151,6 +151,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       supportVectorizedRead = false
       supportReturningBatch = false
       false
+    } else if (schema.fields.exists(f => f.dataType.isInstanceOf[StructType]
+        && 
sparkAdapter.isVariantProjectionStruct(f.dataType.asInstanceOf[StructType]))) {
+      // Spark 4.1's PushVariantIntoScan rewrites a variant column to a struct 
of pushed-down
+      // extractions. The Spark vectorized parquet reader treats this as a 
nested type change
+      // (data column is VariantType, required is a struct) and refuses to 
read in vectorized
+      // mode (ParquetSchemaEvolutionUtils throws). Force row-based reading on 
this path.
+      supportVectorizedRead = false
+      supportReturningBatch = false
+      false
+    } else if (HoodieSparkUtils.gteqSpark4_1 && schema.fields.exists(f => 
sparkAdapter.isVariantType(f.dataType))) {
+      // #18605: Spark 4.1's vectorized variant read produces UnsafeRow 
encodings that SIGBUS
+      // during RangePartitioner sampling. Force row-based reads. Spark 4.0 
unaffected.
+      supportVectorizedRead = false
+      supportReturningBatch = false
+      false
     } else {
       val conf = sparkSession.sessionState.conf
       val parquetBatchSupported = 
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
supportBatchWithTableSchema
@@ -173,7 +188,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       }
       supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
       supportReturningBatch = !isMOR && supportVectorizedRead
-      logInfo(s"supportReturningBatch: $supportReturningBatch, 
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " 
+
+      logDebug(s"supportReturningBatch: $supportReturningBatch, 
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " 
+
         s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
       supportReturningBatch
     }
@@ -222,7 +237,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     val superSplitable = super.isSplitable(sparkSession, options, path)
     val isLance = hoodieFileFormat == HoodieFileFormat.LANCE
     val splitable = !isMOR && !isIncremental && !isBootstrap && !isLance && 
superSplitable
-    logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, 
isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
+    logDebug(s"isSplitable: $splitable, super.isSplitable: $superSplitable, 
isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
     splitable
   }
 
@@ -286,7 +301,9 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
             .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
           fileSliceMapping.getSlice(fileGroupName) match {
             case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || 
fileSlice.getLogFiles.findAny().isPresent) =>
-              val readerContext = new 
SparkFileFormatInternalRowReaderContext(fileGroupBaseFileReader.value, filters, 
requiredFilters, storageConf, metaClient.getTableConfig)
+              val readerContext = new SparkFileFormatInternalRowReaderContext(
+                fileGroupBaseFileReader.value, filters, requiredFilters, 
storageConf, metaClient.getTableConfig,
+                sparkRequiredSchema = Some(requiredSchema))
               
readerContext.setEnableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR,
 true))
               val props = metaClient.getTableConfig.getProps
               options.foreach(kv => props.setProperty(kv._1, kv._2))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 56bb3270b5a1..e364772e0596 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -33,20 +33,10 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, LongType, 
MapType, MetadataBuilder, StringType, StructField, StructType}
-import org.scalatest.{Canceled, Outcome}
 
 
 class TestVariantDataType extends HoodieSparkSqlTestBase {
 
-  // TODO(#18605): Re-enable after fixing JVM SIGSEGV crash on Spark 4.1
-  override def withFixture(test: NoArgTest): Outcome = {
-    if (HoodieSparkUtils.gteqSpark4_1) {
-      Canceled("Disabled on Spark 4.1 due to JVM SIGSEGV crash in variant data 
type tests")
-    } else {
-      super.withFixture(test)
-    }
-  }
-
   test(s"Test Table with Variant Data Type") {
     // Variant type is only supported in Spark 4.0+
     assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index cca901a7d362..ea6e96943a69 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.spark.internal.ReflectUtil
 import org.apache.hudi.storage.StorageConfiguration
 
-import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, 
Types}
 import org.apache.parquet.schema.Type.Repetition
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
@@ -198,6 +198,14 @@ abstract class BaseSpark4Adapter extends SparkAdapter with 
Logging {
     (requiredType, fileType) match {
       case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) => 
Some(true)
       case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) => 
Some(true)
+      // Spark 4.1's PushVariantIntoScan rewrites a `v: VariantType` column 
into a
+      // pushed-down projection struct (each child carries `VariantMetadata`). 
When the file
+      // stores `v` as a real Variant, the projection struct is NOT a type 
change — parquet-mr
+      // reads the variant natively and projects per-row using the field 
metadata. Treat the
+      // pair as compatible so Hudi's schema-change machinery doesn't rewrite 
the requested
+      // schema back to `VariantType` (which would lose the projection 
metadata).
+      case (s: StructType, _: VariantType) if isVariantProjectionStruct(s) => 
Some(true)
+      case (_: VariantType, s: StructType) if isVariantProjectionStruct(s) => 
Some(true)
       case _ => None // Not a VariantType comparison, use default logic
     }
   }
@@ -245,13 +253,16 @@ abstract class BaseSpark4Adapter extends SparkAdapter 
with Logging {
     // VariantType is always stored in Parquet as a struct with separate value 
and metadata binary fields.
     // This matches how the HoodieRowParquetWriteSupport writes variant data.
     // Note: We intentionally omit 'typed_value' for shredded variants as this 
writer only accesses raw binary blobs.
-    // TODO: use `.as(LogicalTypeAnnotation.variantType())` after parquet-java 
version is bumped to 1.16.0
-    Types.buildGroup(repetition)
+    // The variant LogicalTypeAnnotation is applied via 
applyVariantLogicalType, Spark 4.0 (parquet 1.15.2)
+    // is a no-op since the annotation only exists in parquet 1.16.0+; Spark 
4.1 overrides to apply it.
+    val builder = Types.buildGroup(repetition)
       .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Repetition.REQUIRED).named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
       .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
valueRepetition).named(HoodieSchema.Variant.VARIANT_VALUE_FIELD))
-      .named(fieldName)
+    applyVariantLogicalType(builder).named(fieldName)
   }
 
+  protected def applyVariantLogicalType(builder: 
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = builder
+
   override def isVariantShreddingStruct(structType: StructType): Boolean = {
     SparkShreddingUtils.isVariantShreddingStruct(structType)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index a4eca808af79..7a3d8bec2403 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -22,8 +22,10 @@ import org.apache.hudi.client.model.{HoodieInternalRow, 
Spark40HoodieInternalRow
 import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.hudi.common.util.{Option => HOption}
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.MessageType
 import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
@@ -37,11 +39,12 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
 import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport, 
ParquetFileFormat, Spark40HoodieParquetReadSupport, 
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.hudi.HoodieMemoryStream
@@ -195,6 +198,17 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
     Spark40ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
   }
 
+  override def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+                                        enableVectorizedReader: Boolean,
+                                        enableTimestampFieldRepair: Boolean,
+                                        datetimeRebaseSpec: RebaseSpec,
+                                        tableSchemaOpt: HOption[MessageType])
+      : HoodieParquetReadSupport = {
+    new Spark40HoodieParquetReadSupport(
+      convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+      datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt)
+  }
+
   /**
    * TODO
    *
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..02a39037822b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.common.util.{Option => HOption}
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, 
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.types.{StructType, VariantType}
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 
4.1+ reads
+//  variant fields by name via SPARK-54410, so the reorder workaround below is 
no longer
+//  needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its 
converters
+//  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the
+//  Parquet schema has [metadata, value] order (per spec), the positional 
mismatch causes
+//  MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, 
metadata] in
+//  the requested schema. parquet-mr reconciles requested vs file schema by 
field name,
+//  so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+                                       convertTz: Option[ZoneId],
+                                       enableVectorizedReader: Boolean,
+                                       enableTimestampFieldRepair: Boolean,
+                                       datetimeRebaseSpec: RebaseSpec,
+                                       int96RebaseSpec: RebaseSpec,
+                                       tableSchemaOpt: HOption[MessageType] = 
HOption.empty())
+  extends HoodieParquetReadSupport(
+    convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+    datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt) {
+
+  override def init(context: InitContext): ReadContext = {
+    val baseContext = super.init(context)
+    // Resolve the Spark catalyst requested schema so the reorder is gated on
+    // VariantType — a user struct that happens to be <value: binary, 
metadata: binary>
+    // shouldn't be silently reshuffled.
+    val sparkRequestedSchema = Option(context.getConfiguration.get(
+      ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
+      .map(StructType.fromString)
+    val reorderedSchema = Spark40HoodieParquetReadSupport.reorderVariantFields(
+      baseContext.getRequestedSchema, sparkRequestedSchema)
+    new ReadContext(reorderedSchema, baseContext.getReadSupportMetadata)
+  }
+}
+
+object Spark40HoodieParquetReadSupport {
+  /**
+   * Reorders variant group fields in the requested schema so that "value" 
precedes "metadata".
+   * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which 
builds its
+   * converters array in hardcoded [value, metadata] order and indexes by 
schema position.
+   * parquet-mr reconciles the requested schema against the file schema by 
field name,
+   * so the correct bytes still flow to the correct converters regardless of 
file order.
+   *
+   * When a Spark catalyst schema is supplied, reorder only the top-level 
fields that are
+   * actually typed `VariantType` in catalyst; this prevents reshuffling a 
user-defined
+   * `struct<value: binary, metadata: binary>` that happens to match the 
parquet shape.
+   */
+  def reorderVariantFields(schema: MessageType, sparkSchema: 
Option[StructType] = None): MessageType = {
+    val variantFieldNames: Set[String] = sparkSchema match {
+      case Some(s) => s.fields.collect { case f if 
f.dataType.isInstanceOf[VariantType] => f.name }.toSet
+      case None => null
+    }
+    val reordered = schema.getFields.asScala.map { f =>
+      if (variantFieldNames == null || variantFieldNames.contains(f.getName)) {
+        reorderVariantType(f)
+      } else f
+    }.toArray[Type]
+    Types.buildMessage().addFields(reordered: _*).named(schema.getName)
+  }
+
+  private def reorderVariantType(t: Type): Type = {
+    t match {
+      case group: GroupType if isVariantGroup(group) =>
+        // Rebuild with [value, metadata] order for Spark compatibility
+        val valueField = group.getType("value")
+        val metadataField = group.getType("metadata")
+        group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
+      case group: GroupType =>
+        // Recurse into nested groups
+        val children = group.getFields.asScala.map(reorderVariantType).asJava
+        group.withNewFields(children)
+      case _ => t
+    }
+  }
+
+  private def isVariantGroup(group: GroupType): Boolean = {
+    group.containsField("value") &&
+      group.containsField("metadata") &&
+      group.getType("value").isPrimitive &&
+      group.getType("metadata").isPrimitive &&
+      group.getType("value").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY &&
+      group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == 
PrimitiveType.PrimitiveTypeName.BINARY
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
index 64ff9afdfd67..a4c006c7a7ac 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
@@ -329,7 +329,9 @@ class Spark40LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
-        val readSupport = new HoodieParquetReadSupport(
+        // Spark40 subclass reorders variant group fields to [value, metadata] 
for Spark 4.0's
+        // positional variant converter (#18334); base class no longer applies 
the reorder.
+        val readSupport = new Spark40HoodieParquetReadSupport(
           convertTz,
           enableVectorizedReader = false,
           enableTimestampFieldRepair = true,
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
index 1516fe870057..1c70642659db 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
@@ -206,7 +206,7 @@ class Spark40ParquetReader(enableVectorizedReader: Boolean,
       }
     } else {
       // ParquetRecordReader returns InternalRow
-      val readSupport = new HoodieParquetReadSupport(
+      val readSupport = new Spark40HoodieParquetReadSupport(
         convertTz,
         enableVectorizedReader = false,
         enableLogicalTimestampRepair,
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..9d3c1fa1a109
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Types
+import org.junit.jupiter.api.{Assertions, Test}
+
+class TestSpark40HoodieParquetReadSupport {
+
+  /**
+   * Validate that reorderVariantFields does not treat groups as variant when 
the value/metadata
+   * fields fail the type checks in isVariantGroup. Each sub-group exercises a 
different false
+   * branch of the short-circuit && chain.
+   */
+  @Test
+  def testReorderVariantFieldsNonVariantGroupsUnchanged(): Unit = {
+    val schema = Types.buildMessage()
+      // value is non-primitive
+      .addField(Types.requiredGroup()
+        
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value"))
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata"))
+        .named("g1"))
+      // value is primitive, metadata is non-primitive
+      .addField(Types.requiredGroup()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
+        
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata"))
+        .named("g2"))
+      // both primitive but non-BINARY
+      .addField(Types.requiredGroup()
+        .addField(Types.required(PrimitiveTypeName.INT32).named("value"))
+        .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
+        .named("g3"))
+      // value is BINARY, metadata is non-BINARY primitive
+      .addField(Types.requiredGroup()
+        .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
+        .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
+        .named("g4"))
+      .named("test")
+
+    val result = Spark40HoodieParquetReadSupport.reorderVariantFields(schema)
+    Assertions.assertEquals(schema, result)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
index b910e79747da..195979548bc4 100644
--- 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, Types}
 import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
@@ -31,11 +32,13 @@ import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
ResolvedTable}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, CreateNamedStruct, Expression, Literal, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.variant.VariantGet
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
@@ -49,7 +52,7 @@ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
 import org.apache.spark.sql.hudi.blob.{BatchedBlobReaderStrategy, 
ScalarFunctions}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark4_1ExtendedSqlParser}
-import org.apache.spark.sql.types.{DataType, DataTypes, Metadata, 
MetadataBuilder, StructType}
+import org.apache.spark.sql.types.{DataType, DataTypes, Metadata, 
MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatchRow
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
@@ -250,6 +253,58 @@ class Spark4_1Adapter extends BaseSpark4Adapter {
     RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
   }
 
+  override def isVariantProjectionStruct(structType: StructType): Boolean = {
+    VariantMetadata.isVariantStruct(structType)
+  }
+
+  override def buildVariantProjector(sparkDataSchema: StructType,
+                                     sparkRequiredSchema: StructType): 
Option[InternalRow => InternalRow] = {
+    // Quick check: any required field a variant projection struct?
+    if (!sparkRequiredSchema.fields.exists(f => 
VariantMetadata.isVariantStruct(f.dataType))) {
+      None
+    } else {
+      // Surface mismatched schemas with both field lists rather than Spark's 
bare
+      // IllegalArgumentException from fieldIndex.
+      def lookupDataField(name: String): (Int, StructField) = {
+        val idx = sparkDataSchema.getFieldIndex(name).getOrElse(
+          throw new IllegalStateException(
+            s"Required field '$name' is absent from sparkDataSchema; " +
+              s"required=${sparkRequiredSchema.fieldNames.mkString("[", ",", 
"]")}, " +
+              s"data=${sparkDataSchema.fieldNames.mkString("[", ",", "]")}"))
+        (idx, sparkDataSchema.fields(idx))
+      }
+      val exprs: Array[Expression] = sparkRequiredSchema.fields.map { rf =>
+        rf.dataType match {
+          case projectedStruct: StructType if 
VariantMetadata.isVariantStruct(projectedStruct) =>
+            val (dataIdx, dataField) = lookupDataField(rf.name)
+            require(isVariantType(dataField.dataType),
+              s"Expected VariantType for field '${rf.name}' in data schema, 
got ${dataField.dataType}")
+            val variantRef: Expression = BoundReference(dataIdx, 
dataField.dataType, dataField.nullable)
+            val childExprs: Seq[Expression] = 
projectedStruct.fields.toSeq.flatMap { child =>
+              val vm = VariantMetadata.fromMetadata(child.metadata)
+              val pathLit = Literal(UTF8String.fromString(vm.path), 
DataTypes.StringType)
+              val tz: Option[String] = Option(vm.timeZoneId)
+              val variantGet: Expression = VariantGet(variantRef, pathLit, 
child.dataType, vm.failOnError, tz)
+              Seq(Literal(UTF8String.fromString(child.name), 
DataTypes.StringType), variantGet)
+            }
+            CreateNamedStruct(childExprs)
+          case _ =>
+            val (dataIdx, dataField) = lookupDataField(rf.name)
+            BoundReference(dataIdx, dataField.dataType, dataField.nullable)
+        }
+      }
+
+      val projection = UnsafeProjection.create(exprs.toIndexedSeq, 
DataTypeUtils.toAttributes(sparkDataSchema))
+      Some(row => projection(row))
+    }
+  }
+
+  // Apply LogicalTypeAnnotation.variantType((byte) 1) to the variant group, 
matching parquet 1.16+'s
+  // SparkToParquetSchemaConverter convention.
+  override protected def applyVariantLogicalType(builder: 
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = {
+    builder.as(LogicalTypeAnnotation.variantType(1.toByte))
+  }
+
   override def createMemoryStream[T: Encoder](id: Int, sparkSession: 
SparkSession): HoodieMemoryStream[T] = {
     // In Spark 4.1, MemoryStream is in 
org.apache.spark.sql.execution.streaming.runtime package
     // and takes SparkSession directly instead of SQLContext

Reply via email to