This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2929f7dea38d2919ed6c98b7adccbcdff2526ff8 Author: voonhous <[email protected]> AuthorDate: Fri May 15 05:09:27 2026 +0800 fix(variant): align Spark 4.1 MOR merge with PushVariantIntoScan and restore Spark 4.0 reads (#18674) Co-authored-by: Y Ethan Guo <[email protected]> --- .../hudi/io/storage/HoodieSparkParquetReader.java | 11 +- .../SparkFileFormatInternalRowReaderContext.scala | 76 +++++++++++++- .../sql/avro/HoodieSparkSchemaConverters.scala | 20 +++- .../parquet/HoodieParquetReadSupport.scala | 48 +-------- .../org/apache/spark/sql/hudi/SparkAdapter.scala | 40 ++++++- .../parquet/TestHoodieParquetReadSupport.scala | 34 ------ .../hudi/common/engine/HoodieReaderContext.java | 10 ++ .../table/read/buffer/FileGroupRecordBuffer.java | 27 ++++- .../buffer/PositionBasedFileGroupRecordBuffer.java | 10 +- .../avro/AvroSchemaConverterWithTimestampNTZ.java | 11 +- .../HoodieFileGroupReaderBasedFileFormat.scala | 23 ++++- .../sql/hudi/dml/schema/TestVariantDataType.scala | 10 -- .../spark/sql/adapter/BaseSpark4Adapter.scala | 19 +++- .../apache/spark/sql/adapter/Spark4_0Adapter.scala | 16 ++- .../parquet/Spark40HoodieParquetReadSupport.scala | 115 +++++++++++++++++++++ .../Spark40LegacyHoodieParquetFileFormat.scala | 4 +- .../datasources/parquet/Spark40ParquetReader.scala | 2 +- .../TestSpark40HoodieParquetReadSupport.scala | 59 +++++++++++ .../apache/spark/sql/adapter/Spark4_1Adapter.scala | 59 ++++++++++- 19 files changed, 474 insertions(+), 120 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java index 80ed9be8420e..d72409064a09 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java @@ -187,10 +187,13 @@ public class HoodieSparkParquetReader implements HoodieSparkFileReader { String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(), sqlConf.parquetRecordFilterEnabled()))); }); } - ParquetReader<InternalRow> reader = ParquetReader.builder(new HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true, - rebaseDateSpec, - SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"), messageSchema), - new Path(path.toUri())) + // Via SparkAdapter so Spark 4.0 picks up its variant-reordering ReadSupport subclass + // (#18334); constructing the base class here would MALFORMED_VARIANT on Spark 4.0. + HoodieParquetReadSupport readSupport = SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetReadSupport( + Option$.MODULE$.empty(), true, true, + rebaseDateSpec, + messageSchema); + ParquetReader<InternalRow> reader = ParquetReader.builder(readSupport, new Path(path.toUri())) .withConf(storage.getConf().unwrapAs(Configuration.class)) .build(); UnsafeProjection projection = evolution.generateUnsafeProjection(); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 92b963f68390..edfcc44e9aa2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME +import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair} import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader, VectorConversionUtils} @@ -38,11 +39,14 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{ArrayType, ByteType, DoubleType, FloatType, LongType, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import java.util.function.{Function => JFunction} + import scala.collection.JavaConverters._ /** @@ -61,14 +65,62 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR filters: Seq[Filter], requiredFilters: Seq[Filter], storageConfiguration: StorageConfiguration[_], - tableConfig: HoodieTableConfig) + tableConfig: HoodieTableConfig, + sparkRequiredSchema: Option[StructType] = None) extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, SparkFileFormatInternalRecordContext.apply(tableConfig)) { + + // Java-friendly auxiliary constructor (Scala default args don't generate matching Java overloads). + def this(baseFileReader: SparkColumnarFileReader, + filters: Seq[Filter], + requiredFilters: Seq[Filter], + storageConfiguration: StorageConfiguration[_], + tableConfig: HoodieTableConfig) = + this(baseFileReader, filters, requiredFilters, storageConfiguration, tableConfig, None) + lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter private lazy val recordKeyFields = Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty) private lazy val bootstrapSafeFilters: Seq[Filter] = filters.filter(filterIsSafeForBootstrap) ++ requiredFilters private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_, recordKeyFields)) ++ requiredFilters private lazy val allFilters = filters ++ requiredFilters + // For each field of `target`, replace its dataType with the matching field's projected + // variant struct from `source` (when present). Non-matching fields pass through. + private def overlayVariantProjections(target: StructType, source: StructType): StructType = { + StructType(target.fields.map { f => + SparkFileFormatInternalRowReaderContext.findFieldByName(source, f.name).map(_.dataType) match { + case Some(projStruct: StructType) if sparkAdapter.isVariantProjectionStruct(projStruct) => + f.copy(dataType = projStruct) + case _ => f + } + }) + } + + // Aligns log-block records with the PushVariantIntoScan-projected variant shape before + // they reach the merger. Preserves merger metadata cols (_hoodie_record_key, + // _tmp_metadata_row_index) which the merger reads by ordinal — projecting down to the + // bare required schema would drop them and the merger would read garbage offsets. + override def getLogBlockRecordProjection( + dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow, InternalRow]] = { + val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f => f.dataType match { + case st: StructType => sparkAdapter.isVariantProjectionStruct(st) + case _ => false + })) + if (!needsProjection) { + return HOption.empty[JFunction[InternalRow, InternalRow]]() + } + val req = sparkRequiredSchema.get + val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema) + val targetStruct = overlayVariantProjections(dataStruct, req) + sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match { + case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] { + // .copy() because the buffer stores rows into ExternalSpillableMap and + // UnsafeProjection reuses a single output buffer. + override def apply(r: InternalRow): InternalRow = p(r).copy() + }) + case None => HOption.empty[JFunction[InternalRow, InternalRow]]() + } + } + override def getFileRecordIterator(filePath: StoragePath, start: Long, length: Long, @@ -79,7 +131,14 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR if (hasRowIndexField) { assert(getRecordContext.supportsParquetRowIndex()) } - val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema) + // Use the engine's augmented requiredSchema (includes merger metadata cols the merger + // reads from base rows), but overlay the projected variant shape from sparkRequiredSchema + // so parquet-mr's PushVariantIntoScan kicks in (HoodieSchema collapses the projected + // struct back to VariantType, dropping the VariantMetadata parquet-mr looks for). + val structType = sparkRequiredSchema match { + case Some(sparkReq) => overlayVariantProjections(HoodieInternalRowUtils.getCachedSchema(requiredSchema), sparkReq) + case None => HoodieInternalRowUtils.getCachedSchema(requiredSchema) + } // Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY, so the reader needs BinaryType // and we decode back to ArrayType below. Lance returns ArrayType natively, so skip @@ -100,8 +159,10 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR val (readSchema, readFilters) = getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField) if (FSUtils.isLogFile(filePath)) { // NOTE: now only primary key based filtering is supported for log files + // Variant alignment happens later via getLogBlockRecordProjection in the merge buffer. new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath) - .asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, readFilters.asJava).asInstanceOf[ClosableIterator[InternalRow]] + .asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema, readFilters.asJava) + .asInstanceOf[ClosableIterator[InternalRow]] } else { // partition value is empty because the spark parquet reader will append the partition columns to // each row if they are given. That is the only usage of the partition values in the reader. @@ -274,6 +335,15 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR } object SparkFileFormatInternalRowReaderContext { + /** Look up a field by name, honoring `spark.sql.caseSensitive`. */ + private[hudi] def findFieldByName(schema: StructType, name: String): Option[StructField] = { + if (SQLConf.get.caseSensitiveAnalysis) { + schema.fields.find(_.name == name) + } else { + schema.fields.find(_.name.equalsIgnoreCase(name)) + } + } + // From "namedExpressions.scala": Used to construct to record position field metadata. private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = "__file_source_generated_metadata_col" private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col" diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala index f634b773f7ea..c1c3fe1f6837 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.avro -import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema, HoodieSchemaField, HoodieSchemaType} import org.apache.hudi.common.schema.HoodieSchema.TimePrecision import org.apache.hudi.internal.schema.HoodieSchemaException @@ -190,6 +190,12 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { isCanonicalVariantStruct(variantStruct) => HoodieSchema.createVariant(recordName, nameSpace, null) + // PushVariantIntoScan (Spark 4.1+) rewrites Variant to a struct of extractions; map + // it back to a regular HoodieSchema Variant. parquet-mr does the projection natively + // from the Spark required schema's VariantMetadata. + case projectedVariant: StructType if isSparkVariantProjectionStruct(projectedVariant) => + HoodieSchema.createVariant(recordName, nameSpace, null) + case st: StructType => val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName @@ -514,6 +520,18 @@ object HoodieSparkSchemaConverters extends SparkAdapterSupport { } } + /** + * Detects a Spark 4.1 PushVariantIntoScan-projected struct. Short-circuits before consulting + * the version-specific SparkAdapter so shared-module unit tests (whose classpath lacks any + * SparkXAdapter) don't trigger an adapter-load failure on plain StructType conversions. + */ + private def isSparkVariantProjectionStruct(st: StructType): Boolean = { + if (!HoodieSparkUtils.gteqSpark4_1) return false + if (st.fields.forall(_.metadata == Metadata.empty)) return false + try sparkAdapter.isVariantProjectionStruct(st) + catch { case _: NoClassDefFoundError | _: ClassNotFoundException => false } + } + private def sparkTypeForVectorElementType( elementType: HoodieSchema.Vector.VectorElementType): DataType = elementType match { case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala index 5d77bd2fa3d7..c63ec0eef9e4 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils import org.apache.parquet.hadoop.api.InitContext import org.apache.parquet.hadoop.api.ReadSupport.ReadContext -import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, SchemaRepair, Type, Types} +import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type, Types} import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import java.time.ZoneId @@ -50,15 +50,7 @@ class HoodieParquetReadSupport( readContext.getRequestedSchema } val trimmedParquetSchema = HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema, context.getFileSchema) - // TODO: Remove this workaround once Spark is bumped to 4.1+, which reads variant fields by - // name via SPARK-54410. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its converters - // array in hardcoded [value, metadata] order, then indexes by schema position. If the Parquet - // schema has [metadata, value] order (per spec), the positional mismatch causes - // MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, metadata] in the - // requested schema. parquet-mr reconciles requested vs file schema by field name, so bytes - // flow correctly. This is tracked in issue #18334 - val reorderedSchema = HoodieParquetReadSupport.reorderVariantFields(trimmedParquetSchema) - new ReadContext(reorderedSchema, readContext.getReadSupportMetadata) + new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata) } } @@ -83,42 +75,6 @@ object HoodieParquetReadSupport { Types.buildMessage().addFields(trimmedFields: _*).named(requestedSchema.getName) } - /** - * Reorders variant group fields in the requested schema so that "value" precedes "metadata". - * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which builds its - * converters array in hardcoded [value, metadata] order and indexes by schema position. - * parquet-mr reconciles the requested schema against the file schema by field name, - * so the correct bytes still flow to the correct converters regardless of file order. - */ - def reorderVariantFields(schema: MessageType): MessageType = { - val reordered = schema.getFields.asScala.map(reorderVariantType).toArray[Type] - Types.buildMessage().addFields(reordered: _*).named(schema.getName) - } - - private def reorderVariantType(t: Type): Type = { - t match { - case group: GroupType if isVariantGroup(group) => - // Rebuild with [value, metadata] order for Spark compatibility - val valueField = group.getType("value") - val metadataField = group.getType("metadata") - group.withNewFields(java.util.Arrays.asList(valueField, metadataField)) - case group: GroupType => - // Recurse into nested groups - val children = group.getFields.asScala.map(reorderVariantType).asJava - group.withNewFields(children) - case _ => t - } - } - - private def isVariantGroup(group: GroupType): Boolean = { - group.containsField("value") && - group.containsField("metadata") && - group.getType("value").isPrimitive && - group.getType("metadata").isPrimitive && - group.getType("value").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY && - group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY - } - private def trimParquetType(requestedType: Type, fileType: Type): Option[Type] = { if (requestedType.equals(fileType)) { Some(requestedType) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 4eb680e1efc7..2934536f4ec9 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit +import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.storage.StorageConfiguration import org.apache.hadoop.conf.Configuration @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetFilters} +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport, ParquetFileFormat, ParquetFilters} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.parser.HoodieExtendedParserInterface import org.apache.spark.sql.sources.{BaseRelation, Filter} @@ -242,6 +243,21 @@ trait SparkAdapter extends Serializable { options: Map[String, String], hadoopConf: Configuration): Option[SparkColumnarFileReader] + /** + * Build the [[HoodieParquetReadSupport]] for a parquet read. Spark 4.0 overrides to return + * its variant-aware subclass (variant group field reorder for the positional converter). + * int96 rebase mode is fixed to LEGACY (Hudi convention for timestamp compatibility). + */ + def createParquetReadSupport(convertTz: Option[java.time.ZoneId], + enableVectorizedReader: Boolean, + enableTimestampFieldRepair: Boolean, + datetimeRebaseSpec: RebaseSpec, + tableSchemaOpt: HOption[MessageType]) + : HoodieParquetReadSupport = { + new HoodieParquetReadSupport(convertTz, enableVectorizedReader, enableTimestampFieldRepair, + datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt) + } + /** * use new qe execute */ @@ -466,6 +482,28 @@ trait SparkAdapter extends Serializable { */ def isVariantShreddingStruct(structType: StructType): Boolean + /** + * Checks if a StructType is the result of Spark 4.1's PushVariantIntoScan rewriting — i.e., + * every child field carries `VariantMetadata` describing a pushed-down variant extraction. + * + * Returns false on Spark versions earlier than 4.1 (the rewriting only happens there). + */ + def isVariantProjectionStruct(structType: StructType): Boolean = false + + /** + * If `sparkRequiredSchema` contains any field that's a Spark 4.1 variant projection struct + * (i.e., the same-named field in `sparkDataSchema` is `VariantType`), returns a row + * transformer that takes an InternalRow in the data-schema shape (with full variants) and + * produces an InternalRow in the required-schema shape (with each variant column projected + * to its requested struct via VariantGet). + * + * Used on the MOR log-file path: log records carry the full variant on disk, but the merger + * expects rows aligned to the post-PushVariantIntoScan required schema. Returns None when + * there's nothing to project (cheap fast-path for Spark < 4.1 and for non-variant queries). + */ + def buildVariantProjector(sparkDataSchema: StructType, + sparkRequiredSchema: StructType): Option[InternalRow => InternalRow] = None + /** * Generates a shredded Variant schema and marks it with write shredding metadata. * diff --git a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala index 6c2ec5b725cd..973454d1b70c 100644 --- a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala +++ b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala @@ -113,38 +113,4 @@ class TestHoodieParquetReadSupport { .named("required") Assertions.assertEquals(expectedSchema, trimmedSchema) } - - /** - * Validate that reorderVariantFields does not treat groups as variant when the value/metadata - * fields fail the type checks in isVariantGroup. Each sub-group exercises a different false - * branch of the short-circuit && chain (lines 116-119). - */ - @Test - def testReorderVariantFields_nonVariantGroupsUnchanged(): Unit = { - val schema = Types.buildMessage() - // value is non-primitive → line 116 false - .addField(Types.requiredGroup() - .addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value")) - .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata")) - .named("g1")) - // value is primitive, metadata is non-primitive → line 117 false - .addField(Types.requiredGroup() - .addField(Types.required(PrimitiveTypeName.BINARY).named("value")) - .addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata")) - .named("g2")) - // both primitive but non-BINARY → line 118 false - .addField(Types.requiredGroup() - .addField(Types.required(PrimitiveTypeName.INT32).named("value")) - .addField(Types.required(PrimitiveTypeName.INT32).named("metadata")) - .named("g3")) - // value is BINARY, metadata is non-BINARY primitive → line 119 false - .addField(Types.requiredGroup() - .addField(Types.required(PrimitiveTypeName.BINARY).named("value")) - .addField(Types.required(PrimitiveTypeName.INT32).named("metadata")) - .named("g4")) - .named("test") - - val result = HoodieParquetReadSupport.reorderVariantFields(schema) - Assertions.assertEquals(schema, result) - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 393dc8e789f6..953014802cf2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -56,6 +56,7 @@ import org.apache.hudi.storage.StoragePathInfo; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY; @@ -383,6 +384,15 @@ public abstract class HoodieReaderContext<T> { HoodieSchema dataRequiredSchema, List<Pair<String, Object>> requiredPartitionFieldAndValues); + /** + * Optional per-row transformer applied to log-block records before they reach the merger. + * Engines override this to align records with a projected read schema (e.g. Spark 4.1's + * PushVariantIntoScan). Default is no projection. + */ + public Option<Function<T, T>> getLogBlockRecordProjection(HoodieSchema dataBlockSchema) { + return Option.empty(); + } + public Option<Pair<String, String>> getPayloadClasses(TypedProperties props) { return getRecordMerger().map(merger -> { if (merger.getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java index a0eef45b137b..702ecf5041e0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java @@ -199,9 +199,8 @@ abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T } else { blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext); } - Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock); - return Pair.of(new CloseableMappingIterator<>( - blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), schemaTransformerWithEvolvedSchema.getRight()); + Pair<Function<T, T>, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock); + return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator, projectedTransformer.getLeft()), projectedTransformer.getRight()); } catch (IOException e) { throw new HoodieIOException("Failed to deser records from log files ", e); } @@ -281,6 +280,28 @@ abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T return Pair.of(transformer, evolvedSchema); } + /** + * Composes schema evolution then the engine's optional log-block record projection + * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved data-block schema + * — the projector preserves field shape, only rewriting variant fields, so merger + * metadata cols (read by ordinal) stay intact. + * + * <p>Skipped when a custom payload class is configured: {@code PayloadUpdateProcessor} + * round-trips through {@code convertToAvroRecord} against a schema that still types + * variant fields as {@code VariantType}, which would mis-decode rewritten rows. + */ + protected Pair<Function<T, T>, HoodieSchema> getProjectedTransformer(HoodieDataBlock dataBlock) { + Pair<Function<T, T>, HoodieSchema> evolved = getSchemaTransformerWithEvolvedSchema(dataBlock); + if (payloadClasses.isPresent()) { + return evolved; + } + Option<Function<T, T>> logProjOpt = readerContext.getLogBlockRecordProjection(evolved.getRight()); + if (!logProjOpt.isPresent()) { + return evolved; + } + return Pair.of(evolved.getLeft().andThen(logProjOpt.get()), evolved.getRight()); + } + private static class LogRecordIterator<T> implements ClosableIterator<BufferedRecord<T>> { private final FileGroupRecordBuffer<T> fileGroupRecordBuffer; private final Iterator<BufferedRecord<T>> logRecordIterator; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java index 18f9acc43511..73dd5dda5e87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java @@ -126,9 +126,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco partialUpdateModeOpt); } - Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock); + Pair<Function<T, T>, HoodieSchema> projectedTransformer = getProjectedTransformer(dataBlock); - HoodieSchema schema = HoodieSchemaCache.intern(schemaTransformerWithEvolvedSchema.getRight()); + HoodieSchema schema = HoodieSchemaCache.intern(projectedTransformer.getRight()); // TODO: Return an iterator that can generate sequence number with the record. // Then we can hide this logic into data block. @@ -144,9 +144,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco } long recordPosition = recordPositions.get(recordIndex++); - T evolvedNextRecord = schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord); - boolean isDelete = readerContext.getRecordContext().isDeleteRecord(evolvedNextRecord, deleteContext); - BufferedRecord<T> bufferedRecord = BufferedRecords.fromEngineRecord(evolvedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete); + T projectedNextRecord = projectedTransformer.getLeft().apply(nextRecord); + boolean isDelete = readerContext.getRecordContext().isDeleteRecord(projectedNextRecord, deleteContext); + BufferedRecord<T> bufferedRecord = BufferedRecords.fromEngineRecord(projectedNextRecord, schema, readerContext.getRecordContext(), orderingFieldNames, isDelete); processNextDataRecord(bufferedRecord, recordPosition); } } diff --git a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java index 8895186fd055..c28a9a83c278 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java +++ b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.ConversionPatterns; @@ -77,6 +78,7 @@ import static org.apache.parquet.schema.Type.Repetition.REPEATED; * This was taken from parquet-java 1.13.1 AvroSchemaConverter and modified * to support local timestamp types by copying a few methods from 1.14.0 AvroSchemaConverter. */ +@Slf4j @SuppressWarnings("all") public class AvroSchemaConverterWithTimestampNTZ extends HoodieAvroParquetSchemaConverter { @@ -478,7 +480,14 @@ public class AvroSchemaConverterWithTimestampNTZ extends HoodieAvroParquetSchema public java.util.Optional<HoodieSchema> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { return java.util.Optional.of(HoodieSchema.create(HoodieSchemaType.STRING)); } - }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType)); + }).orElseGet(() -> { + // Unrecognized annotation (e.g., parquet 1.16.0+ VariantLogicalTypeAnnotation, not + // available on the parquet version we compile against). Fall back to record + // conversion, correct for the variant binary group (`metadata`, `value`) we write. + log.debug("Unrecognized parquet LogicalTypeAnnotation '{}' on group '{}', falling back to record conversion", + logicalTypeAnnotation, parquetGroupType.getName()); + return convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names); + }); } else { // if no original type then it's a record return convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 3da22ff8ebe7..de4ffb400d4c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -151,6 +151,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, supportVectorizedRead = false supportReturningBatch = false false + } else if (schema.fields.exists(f => f.dataType.isInstanceOf[StructType] + && sparkAdapter.isVariantProjectionStruct(f.dataType.asInstanceOf[StructType]))) { + // Spark 4.1's PushVariantIntoScan rewrites a variant column to a struct of pushed-down + // extractions. The Spark vectorized parquet reader treats this as a nested type change + // (data column is VariantType, required is a struct) and refuses to read in vectorized + // mode (ParquetSchemaEvolutionUtils throws). Force row-based reading on this path. + supportVectorizedRead = false + supportReturningBatch = false + false + } else if (HoodieSparkUtils.gteqSpark4_1 && schema.fields.exists(f => sparkAdapter.isVariantType(f.dataType))) { + // #18605: Spark 4.1's vectorized variant read produces UnsafeRow encodings that SIGBUS + // during RangePartitioner sampling. Force row-based reads. Spark 4.0 unaffected. + supportVectorizedRead = false + supportReturningBatch = false + false } else { val conf = sparkSession.sessionState.conf val parquetBatchSupported = ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && supportBatchWithTableSchema @@ -173,7 +188,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, } supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch supportReturningBatch = !isMOR && supportVectorizedRead - logInfo(s"supportReturningBatch: $supportReturningBatch, supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " + + logDebug(s"supportReturningBatch: $supportReturningBatch, supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, " + s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch") supportReturningBatch } @@ -222,7 +237,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val superSplitable = super.isSplitable(sparkSession, options, path) val isLance = hoodieFileFormat == HoodieFileFormat.LANCE val splitable = !isMOR && !isIncremental && !isBootstrap && !isLance && superSplitable - logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap") + logDebug(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap") splitable } @@ -286,7 +301,9 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)) fileSliceMapping.getSlice(fileGroupName) match { case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) => - val readerContext = new SparkFileFormatInternalRowReaderContext(fileGroupBaseFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig) + val readerContext = new SparkFileFormatInternalRowReaderContext( + fileGroupBaseFileReader.value, filters, requiredFilters, storageConf, metaClient.getTableConfig, + sparkRequiredSchema = Some(requiredSchema)) readerContext.setEnableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true)) val props = metaClient.getTableConfig.getProps options.foreach(kv => props.setProperty(kv._1, kv._2)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala index 56bb3270b5a1..e364772e0596 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala @@ -33,20 +33,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, LongType, MapType, MetadataBuilder, StringType, StructField, StructType} -import org.scalatest.{Canceled, Outcome} class TestVariantDataType extends HoodieSparkSqlTestBase { - // TODO(#18605): Re-enable after fixing JVM SIGSEGV crash on Spark 4.1 - override def withFixture(test: NoArgTest): Outcome = { - if (HoodieSparkUtils.gteqSpark4_1) { - Canceled("Disabled on Spark 4.1 due to JVM SIGSEGV crash in variant data type tests") - } else { - super.withFixture(test) - } - } - test(s"Test Table with Variant Data Type") { // Variant type is only supported in Spark 4.0+ assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher") diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index cca901a7d362..ea6e96943a69 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark.internal.ReflectUtil import org.apache.hudi.storage.StorageConfiguration -import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types} +import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, Types} import org.apache.parquet.schema.Type.Repetition import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging @@ -198,6 +198,14 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { (requiredType, fileType) match { case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) => Some(true) case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) => Some(true) + // Spark 4.1's PushVariantIntoScan rewrites a `v: VariantType` column into a + // pushed-down projection struct (each child carries `VariantMetadata`). When the file + // stores `v` as a real Variant, the projection struct is NOT a type change — parquet-mr + // reads the variant natively and projects per-row using the field metadata. Treat the + // pair as compatible so Hudi's schema-change machinery doesn't rewrite the requested + // schema back to `VariantType` (which would lose the projection metadata). + case (s: StructType, _: VariantType) if isVariantProjectionStruct(s) => Some(true) + case (_: VariantType, s: StructType) if isVariantProjectionStruct(s) => Some(true) case _ => None // Not a VariantType comparison, use default logic } } @@ -245,13 +253,16 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { // VariantType is always stored in Parquet as a struct with separate value and metadata binary fields. // This matches how the HoodieRowParquetWriteSupport writes variant data. // Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs. - // TODO: use `.as(LogicalTypeAnnotation.variantType())` after parquet-java version is bumped to 1.16.0 - Types.buildGroup(repetition) + // The variant LogicalTypeAnnotation is applied via applyVariantLogicalType, Spark 4.0 (parquet 1.15.2) + // is a no-op since the annotation only exists in parquet 1.16.0+; Spark 4.1 overrides to apply it. + val builder = Types.buildGroup(repetition) .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Repetition.REQUIRED).named(HoodieSchema.Variant.VARIANT_METADATA_FIELD)) .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, valueRepetition).named(HoodieSchema.Variant.VARIANT_VALUE_FIELD)) - .named(fieldName) + applyVariantLogicalType(builder).named(fieldName) } + protected def applyVariantLogicalType(builder: Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = builder + override def isVariantShreddingStruct(structType: StructType): Boolean = { SparkShreddingUtils.isVariantShreddingStruct(structType) } diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala index a4eca808af79..7a3d8bec2403 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala @@ -22,8 +22,10 @@ import org.apache.hudi.client.model.{HoodieInternalRow, Spark40HoodieInternalRow import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit +import org.apache.hudi.common.util.{Option => HOption} import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.MessageType import org.apache.spark.SparkEnv import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ @@ -37,11 +39,12 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader} +import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport, ParquetFileFormat, Spark40HoodieParquetReadSupport, Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.hudi.HoodieMemoryStream @@ -195,6 +198,17 @@ class Spark4_0Adapter extends BaseSpark4Adapter { Spark40ParquetReader.build(vectorized, sqlConf, options, hadoopConf) } + override def createParquetReadSupport(convertTz: Option[java.time.ZoneId], + enableVectorizedReader: Boolean, + enableTimestampFieldRepair: Boolean, + datetimeRebaseSpec: RebaseSpec, + tableSchemaOpt: HOption[MessageType]) + : HoodieParquetReadSupport = { + new Spark40HoodieParquetReadSupport( + convertTz, enableVectorizedReader, enableTimestampFieldRepair, + datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt) + } + /** * TODO * diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala new file mode 100644 index 000000000000..02a39037822b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hudi.common.util.{Option => HOption} + +import org.apache.parquet.hadoop.api.InitContext +import org.apache.parquet.hadoop.api.ReadSupport.ReadContext +import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type, Types} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.types.{StructType, VariantType} + +import java.time.ZoneId + +import scala.collection.JavaConverters._ + +// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 4.1+ reads +// variant fields by name via SPARK-54410, so the reorder workaround below is no longer +// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its converters +// array in hardcoded [value, metadata] order, then indexes by schema position. If the +// Parquet schema has [metadata, value] order (per spec), the positional mismatch causes +// MALFORMED_VARIANT. Workaround: reorder variant group fields to [value, metadata] in +// the requested schema. parquet-mr reconciles requested vs file schema by field name, +// so bytes flow correctly. Tracked in issue #18334. +class Spark40HoodieParquetReadSupport( + convertTz: Option[ZoneId], + enableVectorizedReader: Boolean, + enableTimestampFieldRepair: Boolean, + datetimeRebaseSpec: RebaseSpec, + int96RebaseSpec: RebaseSpec, + tableSchemaOpt: HOption[MessageType] = HOption.empty()) + extends HoodieParquetReadSupport( + convertTz, enableVectorizedReader, enableTimestampFieldRepair, + datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt) { + + override def init(context: InitContext): ReadContext = { + val baseContext = super.init(context) + // Resolve the Spark catalyst requested schema so the reorder is gated on + // VariantType — a user struct that happens to be <value: binary, metadata: binary> + // shouldn't be silently reshuffled. + val sparkRequestedSchema = Option(context.getConfiguration.get( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + .map(StructType.fromString) + val reorderedSchema = Spark40HoodieParquetReadSupport.reorderVariantFields( + baseContext.getRequestedSchema, sparkRequestedSchema) + new ReadContext(reorderedSchema, baseContext.getReadSupportMetadata) + } +} + +object Spark40HoodieParquetReadSupport { + /** + * Reorders variant group fields in the requested schema so that "value" precedes "metadata". + * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which builds its + * converters array in hardcoded [value, metadata] order and indexes by schema position. + * parquet-mr reconciles the requested schema against the file schema by field name, + * so the correct bytes still flow to the correct converters regardless of file order. + * + * When a Spark catalyst schema is supplied, reorder only the top-level fields that are + * actually typed `VariantType` in catalyst; this prevents reshuffling a user-defined + * `struct<value: binary, metadata: binary>` that happens to match the parquet shape. + */ + def reorderVariantFields(schema: MessageType, sparkSchema: Option[StructType] = None): MessageType = { + val variantFieldNames: Set[String] = sparkSchema match { + case Some(s) => s.fields.collect { case f if f.dataType.isInstanceOf[VariantType] => f.name }.toSet + case None => null + } + val reordered = schema.getFields.asScala.map { f => + if (variantFieldNames == null || variantFieldNames.contains(f.getName)) { + reorderVariantType(f) + } else f + }.toArray[Type] + Types.buildMessage().addFields(reordered: _*).named(schema.getName) + } + + private def reorderVariantType(t: Type): Type = { + t match { + case group: GroupType if isVariantGroup(group) => + // Rebuild with [value, metadata] order for Spark compatibility + val valueField = group.getType("value") + val metadataField = group.getType("metadata") + group.withNewFields(java.util.Arrays.asList(valueField, metadataField)) + case group: GroupType => + // Recurse into nested groups + val children = group.getFields.asScala.map(reorderVariantType).asJava + group.withNewFields(children) + case _ => t + } + } + + private def isVariantGroup(group: GroupType): Boolean = { + group.containsField("value") && + group.containsField("metadata") && + group.getType("value").isPrimitive && + group.getType("metadata").isPrimitive && + group.getType("value").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY && + group.getType("metadata").asPrimitiveType().getPrimitiveTypeName == PrimitiveType.PrimitiveTypeName.BINARY + } +} diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala index 64ff9afdfd67..a4c006c7a7ac 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala @@ -329,7 +329,9 @@ class Spark40LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu } } else { logDebug(s"Falling back to parquet-mr") - val readSupport = new HoodieParquetReadSupport( + // Spark40 subclass reorders variant group fields to [value, metadata] for Spark 4.0's + // positional variant converter (#18334); base class no longer applies the reorder. + val readSupport = new Spark40HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, enableTimestampFieldRepair = true, diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala index 1516fe870057..1c70642659db 100644 --- a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala @@ -206,7 +206,7 @@ class Spark40ParquetReader(enableVectorizedReader: Boolean, } } else { // ParquetRecordReader returns InternalRow - val readSupport = new HoodieParquetReadSupport( + val readSupport = new Spark40HoodieParquetReadSupport( convertTz, enableVectorizedReader = false, enableLogicalTimestampRepair, diff --git a/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala new file mode 100644 index 000000000000..9d3c1fa1a109 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Types +import org.junit.jupiter.api.{Assertions, Test} + +class TestSpark40HoodieParquetReadSupport { + + /** + * Validate that reorderVariantFields does not treat groups as variant when the value/metadata + * fields fail the type checks in isVariantGroup. Each sub-group exercises a different false + * branch of the short-circuit && chain. + */ + @Test + def testReorderVariantFieldsNonVariantGroupsUnchanged(): Unit = { + val schema = Types.buildMessage() + // value is non-primitive + .addField(Types.requiredGroup() + .addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value")) + .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata")) + .named("g1")) + // value is primitive, metadata is non-primitive + .addField(Types.requiredGroup() + .addField(Types.required(PrimitiveTypeName.BINARY).named("value")) + .addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata")) + .named("g2")) + // both primitive but non-BINARY + .addField(Types.requiredGroup() + .addField(Types.required(PrimitiveTypeName.INT32).named("value")) + .addField(Types.required(PrimitiveTypeName.INT32).named("metadata")) + .named("g3")) + // value is BINARY, metadata is non-BINARY primitive + .addField(Types.requiredGroup() + .addField(Types.required(PrimitiveTypeName.BINARY).named("value")) + .addField(Types.required(PrimitiveTypeName.INT32).named("metadata")) + .named("g4")) + .named("test") + + val result = Spark40HoodieParquetReadSupport.reorderVariantFields(schema) + Assertions.assertEquals(schema, result) + } +} diff --git a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala index b910e79747da..195979548bc4 100644 --- a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, Types} import org.apache.spark.SparkEnv import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ @@ -31,11 +32,13 @@ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, CreateNamedStruct, Expression, Literal, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.variant.VariantGet import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, RebaseDateTime} import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.execution.datasources._ @@ -49,7 +52,7 @@ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions import org.apache.spark.sql.hudi.blob.{BatchedBlobReaderStrategy, ScalarFunctions} import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark4_1ExtendedSqlParser} -import org.apache.spark.sql.types.{DataType, DataTypes, Metadata, MetadataBuilder, StructType} +import org.apache.spark.sql.types.{DataType, DataTypes, Metadata, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatchRow import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -250,6 +253,58 @@ class Spark4_1Adapter extends BaseSpark4Adapter { RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy)) } + override def isVariantProjectionStruct(structType: StructType): Boolean = { + VariantMetadata.isVariantStruct(structType) + } + + override def buildVariantProjector(sparkDataSchema: StructType, + sparkRequiredSchema: StructType): Option[InternalRow => InternalRow] = { + // Quick check: any required field a variant projection struct? + if (!sparkRequiredSchema.fields.exists(f => VariantMetadata.isVariantStruct(f.dataType))) { + None + } else { + // Surface mismatched schemas with both field lists rather than Spark's bare + // IllegalArgumentException from fieldIndex. + def lookupDataField(name: String): (Int, StructField) = { + val idx = sparkDataSchema.getFieldIndex(name).getOrElse( + throw new IllegalStateException( + s"Required field '$name' is absent from sparkDataSchema; " + + s"required=${sparkRequiredSchema.fieldNames.mkString("[", ",", "]")}, " + + s"data=${sparkDataSchema.fieldNames.mkString("[", ",", "]")}")) + (idx, sparkDataSchema.fields(idx)) + } + val exprs: Array[Expression] = sparkRequiredSchema.fields.map { rf => + rf.dataType match { + case projectedStruct: StructType if VariantMetadata.isVariantStruct(projectedStruct) => + val (dataIdx, dataField) = lookupDataField(rf.name) + require(isVariantType(dataField.dataType), + s"Expected VariantType for field '${rf.name}' in data schema, got ${dataField.dataType}") + val variantRef: Expression = BoundReference(dataIdx, dataField.dataType, dataField.nullable) + val childExprs: Seq[Expression] = projectedStruct.fields.toSeq.flatMap { child => + val vm = VariantMetadata.fromMetadata(child.metadata) + val pathLit = Literal(UTF8String.fromString(vm.path), DataTypes.StringType) + val tz: Option[String] = Option(vm.timeZoneId) + val variantGet: Expression = VariantGet(variantRef, pathLit, child.dataType, vm.failOnError, tz) + Seq(Literal(UTF8String.fromString(child.name), DataTypes.StringType), variantGet) + } + CreateNamedStruct(childExprs) + case _ => + val (dataIdx, dataField) = lookupDataField(rf.name) + BoundReference(dataIdx, dataField.dataType, dataField.nullable) + } + } + + val projection = UnsafeProjection.create(exprs.toIndexedSeq, DataTypeUtils.toAttributes(sparkDataSchema)) + Some(row => projection(row)) + } + } + + // Apply LogicalTypeAnnotation.variantType((byte) 1) to the variant group, matching parquet 1.16+'s + // SparkToParquetSchemaConverter convention. + override protected def applyVariantLogicalType(builder: Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = { + builder.as(LogicalTypeAnnotation.variantType(1.toByte)) + } + override def createMemoryStream[T: Encoder](id: Int, sparkSession: SparkSession): HoodieMemoryStream[T] = { // In Spark 4.1, MemoryStream is in org.apache.spark.sql.execution.streaming.runtime package // and takes SparkSession directly instead of SQLContext
