hudi-agent commented on code in PR #18535:
URL: https://github.com/apache/hudi/pull/18535#discussion_r3235377412
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java:
##########
@@ -157,7 +166,10 @@ private HoodieSparkLanceWriter(StoragePath file,
long flushByteWatermark) {
super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark,
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
- this.sparkSchema = enrichSparkSchemaForLance(sparkSchema);
+ this.inputSparkSchema = sparkSchema;
+ Pair<StructType, int[]> variantPlan = enrichForLanceVariant(sparkSchema);
Review Comment:
🤖 nit: could you rename variantPlan to something like variantEnrichment or
enrichedVariantSchema? In Spark/Hudi, plan strongly implies
LogicalPlan/PhysicalPlan, so a future reader will likely do a double-take
before realising this is a Pair<StructType, int[]>.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/io/storage/Spark4VariantProjectedRow.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.types.VariantVal;
+
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/**
+ * Spark 4 implementation of {@link VariantProjectedRow}. Pass-through
+ * {@link InternalRow} that delegates every accessor to the wrapped input row
+ * except at variant ordinals, where it returns a pre-allocated
+ * {@code (metadata, value)} {@link GenericInternalRow} that the extractor
+ * populates on demand.
+ *
+ * <p>Single-row, single-threaded; {@link #wrap(InternalRow)} mutates state.
+ * {@link #copy()} / {@link #update(int, Object)} / {@link #setNullAt(int)}
+ * throw - lance-spark consumes each row synchronously inside
+ * {@code LanceArrowWriter.write(InternalRow)}, so no copy is ever needed.
+ */
+public abstract class Spark4VariantProjectedRow extends InternalRow implements
VariantProjectedRow {
Review Comment:
🤖 nit: declaring the class abstract without any abstract methods will send
readers searching for the abstract method they expect to find. A sentence in
the Javadoc explaining that it is abstract to allow a Spark-version-specific
concrete subclass in a downstream module would save that confusion.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -82,6 +84,178 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
def validateCustomTypeStructures(structType: StructType): Unit =
validateCustomTypeStructuresRecursive(structType)
+ /**
+ * Pads partial BLOB columns - anywhere they appear in the schema tree - to
the canonical
+ * 3-field layout `{type, data, reference}` so the writer's row encoder
always sees the
+ * full shape. Recurses through nested `StructType`, `ArrayType`, and
`MapType` to mirror
+ * the validator's coverage.
+ *
+ * RFC-100 BLOB columns are physically a 3-field struct, but for INLINE
writes only
+ * `{type, data}` is meaningful and for OUT_OF_LINE writes only `{type,
reference}` is
+ * meaningful. This helper accepts either partial form on input and rewrites
each row to
+ * the canonical 3-field shape with `lit(null)` filling in the missing
field. Null blob
+ * structs (and null array elements / map values containing blobs)
round-trip as null.
+ * Already-canonical blob columns pass through unchanged (idempotent).
+ *
+ * @param df the DataFrame whose BLOB columns may be partial at any nesting
depth
+ * @return the input DataFrame if no partial blob columns were found, or a
projected
+ * DataFrame with each partial blob column rewritten to canonical
shape
+ */
+ def padPartialBlobColumns(df: DataFrame): DataFrame = {
+ val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+ if (!df.schema.fields.exists(f => fieldNeedsPad(f, caseSensitive))) {
+ df
+ } else {
+ val projected: Seq[Column] = df.schema.fields.map { f =>
+ if (fieldNeedsPad(f, caseSensitive)) {
+ padField(f, col(s"`${f.name}`"), caseSensitive).as(f.name,
f.metadata)
+ } else {
+ col(s"`${f.name}`")
+ }
+ }
+ df.select(projected: _*)
+ }
+ }
+
+ /**
+ * Returns true if the field itself is a partial blob field that needs
padding,
+ * or if any partial blob field exists somewhere inside its data type.
+ */
+ private def fieldNeedsPad(field: StructField, caseSensitive: Boolean):
Boolean =
+ isPartialBlobField(field, caseSensitive) || typeNeedsPad(field.dataType,
caseSensitive)
+
+ /**
+ * Returns true if the data type contains, anywhere within it, a BLOB-tagged
StructField
+ * whose struct shape is a partial 2-field accepted layout.
+ */
+ private def typeNeedsPad(dataType: DataType, caseSensitive: Boolean):
Boolean = dataType match {
+ case s: StructType => s.fields.exists(f => fieldNeedsPad(f, caseSensitive))
+ case ArrayType(elementType, _) => typeNeedsPad(elementType, caseSensitive)
+ case MapType(_, valueType, _) => typeNeedsPad(valueType, caseSensitive)
+ case _ => false
+ }
+
+ /**
+ * Returns true if `field` is tagged `hudi_type=BLOB` and its struct shape
is one of the
+ * accepted partial layouts: `{type, data}` or `{type, reference}`.
Canonical 3-field
+ * structs return false (no padding needed). Anything else also returns
false - the strict
+ * validator will reject those downstream.
+ */
+ private def isPartialBlobField(field: StructField, caseSensitive: Boolean):
Boolean = {
+ if (!field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) return
false
+ val descriptorType = HoodieSchema
+
.parseTypeDescriptor(field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+ .getType
+ if (descriptorType != HoodieSchemaType.BLOB) return false
+ field.dataType match {
+ case st: StructType if !isCanonicalBlobStruct(st) =>
isAcceptedPartialBlobStruct(st, caseSensitive)
+ case _ => false
+ }
+ }
+
+ private def isAcceptedPartialBlobStruct(st: StructType, caseSensitive:
Boolean): Boolean = {
+ if (st.length != 2) return false
+ val key: String => String =
+ if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
+ val names = st.fields.map(f => key(f.name)).toSet
+ val typeKey = key(HoodieSchema.Blob.TYPE)
+ val dataKey = key(HoodieSchema.Blob.INLINE_DATA_FIELD)
+ val refKey = key(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ names == Set(typeKey, dataKey) || names == Set(typeKey, refKey)
+ }
+
+ /**
+ * Builds a Column expression that rewrites the value at `sourceCol` (which
has the same
+ * shape as `field.dataType`) to its post-padding canonical shape. Used by
+ * [[padPartialBlobColumns]] and recursively by itself for nested struct
fields.
+ *
+ * The caller is responsible for `.as(field.name, field.metadata)` on the
returned column;
+ * this method produces an unaliased value expression so it can also be used
inside
+ * `transform`/`transform_values` lambdas.
+ */
+ private def padField(field: StructField, sourceCol: Column, caseSensitive:
Boolean): Column = {
+ if (isPartialBlobField(field, caseSensitive)) {
+ padBlobStructValue(sourceCol, field.dataType.asInstanceOf[StructType],
caseSensitive)
+ } else {
+ padDataType(field.dataType, sourceCol, caseSensitive)
+ }
+ }
+
+ /**
+ * Builds a Column expression that rewrites a value at `sourceCol` (whose
shape is
+ * `dataType`) so any partial blob structs nested anywhere inside are padded
to canonical.
+ * If no padding is needed inside `dataType`, returns `sourceCol` directly.
+ */
+ private def padDataType(dataType: DataType, sourceCol: Column,
caseSensitive: Boolean): Column = {
+ if (!typeNeedsPad(dataType, caseSensitive)) return sourceCol
+ dataType match {
+ case s: StructType =>
+ val rebuiltFields: Seq[Column] = s.fields.map { f =>
+ val childExpr = sourceCol.getField(f.name)
+ padField(f, childExpr, caseSensitive).as(f.name)
+ }
+ // Preserve null-struct semantics: a null source struct must
round-trip as null,
+ // not as a non-null struct with all-null fields produced by
`struct(...)`.
+ when(sourceCol.isNull, lit(null).cast(rebuiltType(s, caseSensitive)))
+ .otherwise(struct(rebuiltFields: _*))
+
+ case ArrayType(elementType, _) =>
+ // transform() preserves the array's null-ness; the lambda handles
null elements.
+ transform(sourceCol, (x: Column) => padDataType(elementType, x,
caseSensitive))
+
+ case MapType(_, valueType, _) =>
+ // transform_values() preserves the map's null-ness; lambda handles
null values.
+ transform_values(sourceCol, (_: Column, v: Column) =>
padDataType(valueType, v, caseSensitive))
+
+ case _ => sourceCol
+ }
+ }
+
+ /**
+ * Rewrites a (possibly null) blob-struct value at `blobCol` to the
canonical 3-field
+ * shape, padding the missing sibling field with `lit(null)`. Preserves
null-struct
+ * semantics: a null source struct round-trips as null.
+ */
+ private def padBlobStructValue(blobCol: Column, st: StructType,
caseSensitive: Boolean): Column = {
+ val key: String => String =
+ if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
+ val present = st.fields.map(f => key(f.name)).toSet
+ val typeCol =
blobCol.getField(HoodieSchema.Blob.TYPE).as(HoodieSchema.Blob.TYPE)
+ val dataCol = if
(present.contains(key(HoodieSchema.Blob.INLINE_DATA_FIELD))) {
+
blobCol.getField(HoodieSchema.Blob.INLINE_DATA_FIELD).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
+ } else {
+ lit(null).cast(BinaryType).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
+ }
+ val refCol = if
(present.contains(key(HoodieSchema.Blob.EXTERNAL_REFERENCE))) {
+
blobCol.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ } else {
+
lit(null).cast(expectedBlobReferenceStructType).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ }
+ when(blobCol.isNull, lit(null).cast(expectedBlobStructType))
+ .otherwise(struct(typeCol, dataCol, refCol))
+ }
+
+ /**
+ * Returns the post-padding DataType corresponding to `dataType`: every
accepted partial
Review Comment:
🤖 nit: rebuiltType is a bit vague and could apply to any kind of type
reshaping. Something like paddedBlobDataType or canonicalBlobDataType would
make the direction of the transformation clearer at call sites.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]