yihua commented on code in PR #18674:
URL: https://github.com/apache/hudi/pull/18674#discussion_r3243682450
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java:
##########
@@ -187,10 +187,14 @@ public ClosableIterator<UnsafeRow>
getUnsafeRowIterator(HoodieSchema requestedSc
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
sqlConf.parquetRecordFilterEnabled())));
});
}
- ParquetReader<InternalRow> reader = ParquetReader.builder(new
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
- rebaseDateSpec,
-
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),
messageSchema),
- new Path(path.toUri()))
+ // Via SparkAdapter so Spark 4.0 picks up its variant-reordering
ReadSupport subclass
+ // (#18334); constructing the base class here would MALFORMED_VARIANT on
Spark 4.0.
+ HoodieParquetReadSupport readSupport =
SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetReadSupport(
+ Option$.MODULE$.empty(), true, true,
+ rebaseDateSpec,
+ SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),
Review Comment:
This could be folded into the adapter implementation, since
`createParquetReadSupport` is a new adapter method.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -466,6 +481,28 @@ trait SparkAdapter extends Serializable {
*/
def isVariantShreddingStruct(structType: StructType): Boolean
+ /**
+ * Checks if a StructType is the result of Spark 4.1's PushVariantIntoScan
rewriting — i.e.,
+ * every child field carries `VariantMetadata` describing a pushed-down
variant extraction.
+ *
+ * Returns false on Spark versions earlier than 4.1 (the rewriting only
happens there).
+ */
+ def isVariantProjectionStruct(structType: StructType): Boolean = false
+
+ /**
+ * If `sparkRequiredSchema` contains any field that's a Spark 4.1 variant
projection struct
+ * (i.e., the same-named field in `sparkDataSchema` is `VariantType`),
returns a row
+ * transformer that takes an InternalRow in the data-schema shape (with full
variants) and
+ * produces an InternalRow in the required-schema shape (with each variant
column projected
+ * to its requested struct via VariantGet).
+ *
+ * Used on the MOR log-file path: log records carry the full variant on
disk, but the merger
+ * expects rows aligned to the post-PushVariantIntoScan required schema.
Returns None when
+ * there's nothing to project (cheap fast-path for Spark < 4.1 and for
non-variant queries).
+ */
+ def buildVariantProjector(sparkDataSchema: StructType,
+ sparkRequiredSchema: StructType):
Option[InternalRow => InternalRow] = None
Review Comment:
We need to revisit if this impacts performance and see if such projection
can be avoided through custom variant reader.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -61,14 +65,63 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
filters: Seq[Filter],
requiredFilters: Seq[Filter],
storageConfiguration:
StorageConfiguration[_],
- tableConfig: HoodieTableConfig)
+ tableConfig: HoodieTableConfig,
+ sparkDataSchema:
Option[StructType] = None,
Review Comment:
`sparkDataSchema` is not used and should be removed.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -242,6 +242,21 @@ trait SparkAdapter extends Serializable {
options: Map[String, String],
hadoopConf: Configuration):
Option[SparkColumnarFileReader]
+ /**
+ * Build the [[HoodieParquetReadSupport]] for a parquet read. Spark 4.0
overrides to return
+ * its variant-aware subclass (variant group field reorder for the
positional converter).
+ */
+ def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec:
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+ int96RebaseSpec:
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+ tableSchemaOpt:
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType])
Review Comment:
nit: import the classes `RebaseSpec`, `Option => HOption`, `MessageType`
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala:
##########
@@ -195,6 +195,18 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
Spark40ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
}
+ override def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec:
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+ int96RebaseSpec:
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec,
+ tableSchemaOpt:
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType])
+ :
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetReadSupport = {
+ new
org.apache.spark.sql.execution.datasources.parquet.Spark40HoodieParquetReadSupport(
+ convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+ datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt)
Review Comment:
nit: Import the classes and avoid package prefix. Use `Option => HOption`
for org.apache.hudi.common.util.Option to avoid conflict with Scala Option.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -514,6 +520,18 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
}
}
+ /**
+ * Detects a Spark 4.1 PushVariantIntoScan-projected struct. Short-circuits
before consulting
+ * the version-specific SparkAdapter so shared-module unit tests (whose
classpath lacks any
+ * SparkXAdapter) don't trigger an adapter-load failure on plain StructType
conversions.
+ */
+ private def isSparkVariantProjectionStruct(st: StructType): Boolean = {
+ if (!HoodieSparkUtils.gteqSpark4_1) return false
+ if (!st.fields.exists(_.metadata != Metadata.empty)) return false
Review Comment:
nit: could you flip this to `if (st.fields.forall(_.metadata ==
Metadata.empty)) return false`? The double-negative `!exists(!= empty)` takes a
moment to mentally resolve compared to `forall(== empty)`.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type,
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark
4.1+ reads
+// variant fields by name via SPARK-54410, so the reorder workaround below is
no longer
+// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its
converters
+// array in hardcoded [value, metadata] order, then indexes by schema
position. If the
+// Parquet schema has [metadata, value] order (per spec), the positional
mismatch causes
+// MALFORMED_VARIANT. Workaround: reorder variant group fields to [value,
metadata] in
+// the requested schema. parquet-mr reconciles requested vs file schema by
field name,
+// so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+ convertTz: Option[ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ int96RebaseSpec: RebaseSpec,
+ tableSchemaOpt:
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] =
org.apache.hudi.common.util.Option.empty())
Review Comment:
nit: same on fixing imports "org.apache.hudi.common.util.Option => HOption"
and importing MessageType
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type,
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark
4.1+ reads
+// variant fields by name via SPARK-54410, so the reorder workaround below is
no longer
+// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its
converters
+// array in hardcoded [value, metadata] order, then indexes by schema
position. If the
+// Parquet schema has [metadata, value] order (per spec), the positional
mismatch causes
+// MALFORMED_VARIANT. Workaround: reorder variant group fields to [value,
metadata] in
+// the requested schema. parquet-mr reconciles requested vs file schema by
field name,
+// so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+ convertTz: Option[ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ int96RebaseSpec: RebaseSpec,
+ tableSchemaOpt:
org.apache.hudi.common.util.Option[org.apache.parquet.schema.MessageType] =
org.apache.hudi.common.util.Option.empty())
+ extends HoodieParquetReadSupport(
+ convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+ datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt) {
+
+ override def init(context: InitContext): ReadContext = {
+ val baseContext = super.init(context)
+ val reorderedSchema = Spark40HoodieParquetReadSupport.reorderVariantFields(
+ baseContext.getRequestedSchema)
+ new ReadContext(reorderedSchema, baseContext.getReadSupportMetadata)
+ }
+}
+
+object Spark40HoodieParquetReadSupport {
+ /**
+ * Reorders variant group fields in the requested schema so that "value"
precedes "metadata".
+ * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which
builds its
+ * converters array in hardcoded [value, metadata] order and indexes by
schema position.
+ * parquet-mr reconciles the requested schema against the file schema by
field name,
+ * so the correct bytes still flow to the correct converters regardless of
file order.
+ */
+ def reorderVariantFields(schema: MessageType): MessageType = {
+ val reordered =
schema.getFields.asScala.map(reorderVariantType).toArray[Type]
+ Types.buildMessage().addFields(reordered: _*).named(schema.getName)
+ }
+
+ private def reorderVariantType(t: Type): Type = {
+ t match {
+ case group: GroupType if isVariantGroup(group) =>
+ // Rebuild with [value, metadata] order for Spark compatibility
+ val valueField = group.getType("value")
+ val metadataField = group.getType("metadata")
+ group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
+ case group: GroupType =>
+ // Recurse into nested groups
+ val children = group.getFields.asScala.map(reorderVariantType).asJava
+ group.withNewFields(children)
+ case _ => t
+ }
+ }
+
+ private def isVariantGroup(group: GroupType): Boolean = {
+ group.containsField("value") &&
+ group.containsField("metadata") &&
+ group.getType("value").isPrimitive &&
+ group.getType("metadata").isPrimitive &&
+ group.getType("value").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY &&
+ group.getType("metadata").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY
+ }
Review Comment:
nit: can this match non-variant `GroupType` with the same schema?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala:
##########
@@ -61,14 +65,63 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
filters: Seq[Filter],
requiredFilters: Seq[Filter],
storageConfiguration:
StorageConfiguration[_],
- tableConfig: HoodieTableConfig)
+ tableConfig: HoodieTableConfig,
+ sparkDataSchema:
Option[StructType] = None,
+ sparkRequiredSchema:
Option[StructType] = None)
extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig,
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
+
+ // Java-friendly auxiliary constructor (Scala default args don't generate
matching Java overloads).
+ def this(baseFileReader: SparkColumnarFileReader,
+ filters: Seq[Filter],
+ requiredFilters: Seq[Filter],
+ storageConfiguration: StorageConfiguration[_],
+ tableConfig: HoodieTableConfig) =
+ this(baseFileReader, filters, requiredFilters, storageConfiguration,
tableConfig, None, None)
+
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private lazy val recordKeyFields =
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
private lazy val bootstrapSafeFilters: Seq[Filter] =
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_,
recordKeyFields)) ++ requiredFilters
private lazy val allFilters = filters ++ requiredFilters
+ // For each field of `target`, replace its dataType with the matching
field's projected
+ // variant struct from `source` (when present). Non-matching fields pass
through.
+ private def overlayVariantProjections(target: StructType, source:
StructType): StructType = {
+ StructType(target.fields.map { f =>
+ SparkFileFormatInternalRowReaderContext.findFieldByName(source,
f.name).map(_.dataType) match {
+ case Some(projStruct: StructType) if
sparkAdapter.isVariantProjectionStruct(projStruct) =>
+ f.copy(dataType = projStruct)
+ case _ => f
+ }
+ })
+ }
+
+ // Aligns log-block records with the PushVariantIntoScan-projected variant
shape before
+ // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+ // _tmp_metadata_row_index) which the merger reads by ordinal — projecting
down to the
+ // bare required schema would drop them and the merger would read garbage
offsets.
+ override def getLogBlockRecordProjection(
+ dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow,
InternalRow]] = {
+ val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f =>
f.dataType match {
+ case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
+ case _ => false
+ }))
+ if (!needsProjection) {
+ return HOption.empty[JFunction[InternalRow, InternalRow]]()
+ }
+ val req = sparkRequiredSchema.get
+ val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema)
+ val targetStruct = overlayVariantProjections(dataStruct, req)
+ sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match {
+ case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] {
+ // .copy() because the buffer stores rows into ExternalSpillableMap and
+ // UnsafeProjection reuses a single output buffer.
+ override def apply(r: InternalRow): InternalRow = p(r).copy()
Review Comment:
@voonhous Could you check why `copy()` is required? The log record reading
should already have done the record copy when putting the record into the log
record map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]