hudi-agent commented on code in PR #18535:
URL: https://github.com/apache/hudi/pull/18535#discussion_r3235377412


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java:
##########
@@ -157,7 +166,10 @@ private HoodieSparkLanceWriter(StoragePath file,
                                  long flushByteWatermark) {
     super(file, DEFAULT_BATCH_SIZE, allocatorSize, flushByteWatermark,
         bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
-    this.sparkSchema = enrichSparkSchemaForLance(sparkSchema);
+    this.inputSparkSchema = sparkSchema;
+    Pair<StructType, int[]> variantPlan = enrichForLanceVariant(sparkSchema);

Review Comment:
   🤖 nit: could you rename variantPlan to something like variantEnrichment or 
enrichedVariantSchema? In Spark/Hudi, plan strongly implies 
LogicalPlan/PhysicalPlan, so a future reader will likely do a double-take 
before realising this is a Pair<StructType, int[]>.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/io/storage/Spark4VariantProjectedRow.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.types.VariantVal;
+
+import java.util.List;
+import java.util.function.BiConsumer;
+
+/**
+ * Spark 4 implementation of {@link VariantProjectedRow}. Pass-through
+ * {@link InternalRow} that delegates every accessor to the wrapped input row
+ * except at variant ordinals, where it returns a pre-allocated
+ * {@code (metadata, value)} {@link GenericInternalRow} that the extractor
+ * populates on demand.
+ *
+ * <p>Single-row, single-threaded; {@link #wrap(InternalRow)} mutates state.
+ * {@link #copy()} / {@link #update(int, Object)} / {@link #setNullAt(int)}
+ * throw - lance-spark consumes each row synchronously inside
+ * {@code LanceArrowWriter.write(InternalRow)}, so no copy is ever needed.
+ */
+public abstract class Spark4VariantProjectedRow extends InternalRow implements 
VariantProjectedRow {

Review Comment:
   🤖 nit: declaring the class abstract without any abstract methods will send 
readers searching for the abstract method they expect to find. A sentence in 
the Javadoc explaining that it is abstract to allow a Spark-version-specific 
concrete subclass in a downstream module would save that confusion.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -82,6 +84,178 @@ object HoodieSparkSchemaConverters extends 
SparkAdapterSupport {
   def validateCustomTypeStructures(structType: StructType): Unit =
     validateCustomTypeStructuresRecursive(structType)
 
+  /**
+   * Pads partial BLOB columns - anywhere they appear in the schema tree - to 
the canonical
+   * 3-field layout `{type, data, reference}` so the writer's row encoder 
always sees the
+   * full shape. Recurses through nested `StructType`, `ArrayType`, and 
`MapType` to mirror
+   * the validator's coverage.
+   *
+   * RFC-100 BLOB columns are physically a 3-field struct, but for INLINE 
writes only
+   * `{type, data}` is meaningful and for OUT_OF_LINE writes only `{type, 
reference}` is
+   * meaningful. This helper accepts either partial form on input and rewrites 
each row to
+   * the canonical 3-field shape with `lit(null)` filling in the missing 
field. Null blob
+   * structs (and null array elements / map values containing blobs) 
round-trip as null.
+   * Already-canonical blob columns pass through unchanged (idempotent).
+   *
+   * @param df the DataFrame whose BLOB columns may be partial at any nesting 
depth
+   * @return the input DataFrame if no partial blob columns were found, or a 
projected
+   *         DataFrame with each partial blob column rewritten to canonical 
shape
+   */
+  def padPartialBlobColumns(df: DataFrame): DataFrame = {
+    val caseSensitive = SQLConf.get.caseSensitiveAnalysis
+    if (!df.schema.fields.exists(f => fieldNeedsPad(f, caseSensitive))) {
+      df
+    } else {
+      val projected: Seq[Column] = df.schema.fields.map { f =>
+        if (fieldNeedsPad(f, caseSensitive)) {
+          padField(f, col(s"`${f.name}`"), caseSensitive).as(f.name, 
f.metadata)
+        } else {
+          col(s"`${f.name}`")
+        }
+      }
+      df.select(projected: _*)
+    }
+  }
+
+  /**
+   * Returns true if the field itself is a partial blob field that needs 
padding,
+   * or if any partial blob field exists somewhere inside its data type.
+   */
+  private def fieldNeedsPad(field: StructField, caseSensitive: Boolean): 
Boolean =
+    isPartialBlobField(field, caseSensitive) || typeNeedsPad(field.dataType, 
caseSensitive)
+
+  /**
+   * Returns true if the data type contains, anywhere within it, a BLOB-tagged 
StructField
+   * whose struct shape is a partial 2-field accepted layout.
+   */
+  private def typeNeedsPad(dataType: DataType, caseSensitive: Boolean): 
Boolean = dataType match {
+    case s: StructType => s.fields.exists(f => fieldNeedsPad(f, caseSensitive))
+    case ArrayType(elementType, _) => typeNeedsPad(elementType, caseSensitive)
+    case MapType(_, valueType, _) => typeNeedsPad(valueType, caseSensitive)
+    case _ => false
+  }
+
+  /**
+   * Returns true if `field` is tagged `hudi_type=BLOB` and its struct shape 
is one of the
+   * accepted partial layouts: `{type, data}` or `{type, reference}`. 
Canonical 3-field
+   * structs return false (no padding needed). Anything else also returns 
false - the strict
+   * validator will reject those downstream.
+   */
+  private def isPartialBlobField(field: StructField, caseSensitive: Boolean): 
Boolean = {
+    if (!field.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD)) return 
false
+    val descriptorType = HoodieSchema
+      
.parseTypeDescriptor(field.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+      .getType
+    if (descriptorType != HoodieSchemaType.BLOB) return false
+    field.dataType match {
+      case st: StructType if !isCanonicalBlobStruct(st) => 
isAcceptedPartialBlobStruct(st, caseSensitive)
+      case _ => false
+    }
+  }
+
+  private def isAcceptedPartialBlobStruct(st: StructType, caseSensitive: 
Boolean): Boolean = {
+    if (st.length != 2) return false
+    val key: String => String =
+      if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
+    val names = st.fields.map(f => key(f.name)).toSet
+    val typeKey = key(HoodieSchema.Blob.TYPE)
+    val dataKey = key(HoodieSchema.Blob.INLINE_DATA_FIELD)
+    val refKey = key(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    names == Set(typeKey, dataKey) || names == Set(typeKey, refKey)
+  }
+
+  /**
+   * Builds a Column expression that rewrites the value at `sourceCol` (which 
has the same
+   * shape as `field.dataType`) to its post-padding canonical shape. Used by
+   * [[padPartialBlobColumns]] and recursively by itself for nested struct 
fields.
+   *
+   * The caller is responsible for `.as(field.name, field.metadata)` on the 
returned column;
+   * this method produces an unaliased value expression so it can also be used 
inside
+   * `transform`/`transform_values` lambdas.
+   */
+  private def padField(field: StructField, sourceCol: Column, caseSensitive: 
Boolean): Column = {
+    if (isPartialBlobField(field, caseSensitive)) {
+      padBlobStructValue(sourceCol, field.dataType.asInstanceOf[StructType], 
caseSensitive)
+    } else {
+      padDataType(field.dataType, sourceCol, caseSensitive)
+    }
+  }
+
+  /**
+   * Builds a Column expression that rewrites a value at `sourceCol` (whose 
shape is
+   * `dataType`) so any partial blob structs nested anywhere inside are padded 
to canonical.
+   * If no padding is needed inside `dataType`, returns `sourceCol` directly.
+   */
+  private def padDataType(dataType: DataType, sourceCol: Column, 
caseSensitive: Boolean): Column = {
+    if (!typeNeedsPad(dataType, caseSensitive)) return sourceCol
+    dataType match {
+      case s: StructType =>
+        val rebuiltFields: Seq[Column] = s.fields.map { f =>
+          val childExpr = sourceCol.getField(f.name)
+          padField(f, childExpr, caseSensitive).as(f.name)
+        }
+        // Preserve null-struct semantics: a null source struct must 
round-trip as null,
+        // not as a non-null struct with all-null fields produced by 
`struct(...)`.
+        when(sourceCol.isNull, lit(null).cast(rebuiltType(s, caseSensitive)))
+          .otherwise(struct(rebuiltFields: _*))
+
+      case ArrayType(elementType, _) =>
+        // transform() preserves the array's null-ness; the lambda handles 
null elements.
+        transform(sourceCol, (x: Column) => padDataType(elementType, x, 
caseSensitive))
+
+      case MapType(_, valueType, _) =>
+        // transform_values() preserves the map's null-ness; lambda handles 
null values.
+        transform_values(sourceCol, (_: Column, v: Column) => 
padDataType(valueType, v, caseSensitive))
+
+      case _ => sourceCol
+    }
+  }
+
+  /**
+   * Rewrites a (possibly null) blob-struct value at `blobCol` to the 
canonical 3-field
+   * shape, padding the missing sibling field with `lit(null)`. Preserves 
null-struct
+   * semantics: a null source struct round-trips as null.
+   */
+  private def padBlobStructValue(blobCol: Column, st: StructType, 
caseSensitive: Boolean): Column = {
+    val key: String => String =
+      if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
+    val present = st.fields.map(f => key(f.name)).toSet
+    val typeCol = 
blobCol.getField(HoodieSchema.Blob.TYPE).as(HoodieSchema.Blob.TYPE)
+    val dataCol = if 
(present.contains(key(HoodieSchema.Blob.INLINE_DATA_FIELD))) {
+      
blobCol.getField(HoodieSchema.Blob.INLINE_DATA_FIELD).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
+    } else {
+      lit(null).cast(BinaryType).as(HoodieSchema.Blob.INLINE_DATA_FIELD)
+    }
+    val refCol = if 
(present.contains(key(HoodieSchema.Blob.EXTERNAL_REFERENCE))) {
+      
blobCol.getField(HoodieSchema.Blob.EXTERNAL_REFERENCE).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    } else {
+      
lit(null).cast(expectedBlobReferenceStructType).as(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+    }
+    when(blobCol.isNull, lit(null).cast(expectedBlobStructType))
+      .otherwise(struct(typeCol, dataCol, refCol))
+  }
+
+  /**
+   * Returns the post-padding DataType corresponding to `dataType`: every 
accepted partial

Review Comment:
   🤖 nit: rebuiltType is a bit vague and could apply to any kind of type 
reshaping. Something like paddedBlobDataType or canonicalBlobDataType would 
make the direction of the transformation clearer at call sites.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to