hudi-agent commented on code in PR #18938:
URL: https://github.com/apache/hudi/pull/18938#discussion_r3373486278


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/VariantReconstruction.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file 
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded 
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such 
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code 
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before 
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and 
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class VariantReconstruction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VariantReconstruction.class);
+
+  // Classpath fallback when the provider class config is not set; mirrors 
HoodieAvroFileWriterFactory.
+  private static final String[] PROVIDER_CANDIDATES = {
+      "org.apache.hudi.variant.Spark4VariantShreddingProvider"
+  };
+
+  private final HoodieSchema intermediateSchema;
+  private final Schema outputAvroSchema;
+  private final VariantShreddingProvider provider;
+  // Indexed by field position in the (requested == output) record. For target 
fields, the file's
+  // shredded sub-schema and the unshredded target sub-schema for rebuild; 
null for non-targets.
+  private final boolean[] isTarget;
+  private final Schema[] shreddedSubSchemas;
+  private final Schema[] unshreddedSubSchemas;
+
+  private VariantReconstruction(HoodieSchema intermediateSchema, Schema 
outputAvroSchema,
+                                VariantShreddingProvider provider, boolean[] 
isTarget,
+                                Schema[] shreddedSubSchemas, Schema[] 
unshreddedSubSchemas) {
+    this.intermediateSchema = intermediateSchema;
+    this.outputAvroSchema = outputAvroSchema;
+    this.provider = provider;
+    this.isTarget = isTarget;
+    this.shreddedSubSchemas = shreddedSubSchemas;
+    this.unshreddedSubSchemas = unshreddedSubSchemas;
+  }
+
+  /**
+   * Schema to read the parquet file with: the requested schema, but with 
shredded variant columns
+   * swapped to their file (typed_value-bearing) form so parquet-avro 
materializes {@code typed_value}.
+   */
+  HoodieSchema intermediateSchema() {
+    return intermediateSchema;
+  }
+
+  /**
+   * Builds a reconstruction for the given file and requested schemas, or 
returns {@code null} when
+   * none is needed (no shredded variant columns in the file, reading shredded 
variants disabled, or
+   * no provider available - in which case the read proceeds unchanged).
+   */
+  static VariantReconstruction create(HoodieSchema fileSchema, HoodieSchema 
requestedSchema, HoodieStorage storage) {
+    if (requestedSchema.getType() != HoodieSchemaType.RECORD || 
fileSchema.getType() != HoodieSchemaType.RECORD) {
+      return null;
+    }

Review Comment:
   🤖 When the file has shredded variants but no provider can be loaded, we log 
a warning and return null — the reader then proceeds with the unshredded 
requested schema against a file that actually has a `typed_value` column and a 
sparse/optional `value`. Won't this either fail loudly downstream or silently 
produce variants whose `value` is null for rows where data was carried in 
`typed_value`? Have you considered failing fast here so compaction/clustering 
doesn't silently mangle data?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +274,136 @@ public void addFooterMetadata(String key, String value) {
     footerMetadata.put(key, value);
   }
 
+  /**
+   * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a 
shredded variant field,
+   * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+   */
+  private static final class ShreddedVariantField {
+    private final Schema avroSchema;
+    private final HoodieSchema.Variant hoodieSchema;
+
+    ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema) 
{
+      this.avroSchema = avroSchema;
+      this.hoodieSchema = hoodieSchema;
+    }
+  }
+
+  private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+      "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+  /**
+   * Applies a forced shredding schema to all variant fields in the given 
schema.
+   * The forced schema DDL (e.g., {@code "a int, b string"}) defines the 
typed_value
+   * fields that will be added to each variant column.
+   */
+  private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, 
String ddl) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShreddedObject(
+            unwrapped.getAvroSchema().getName(),
+            unwrapped.getAvroSchema().getNamespace(),
+            unwrapped.getAvroSchema().getDoc(),
+            shreddedFields);
+        HoodieSchema replacement = wasNullable
+            ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));

Review Comment:
   🤖 `field.makeNullable().withSchema(replacement)` — since `withSchema(...)` 
recreates the field from the replacement schema (and `replacement` already 
wraps in nullable when `wasNullable` was true), the intermediate 
`makeNullable()` looks like a no-op for the non-nullable case. Is the intent 
that forced-shredding always makes the variant field nullable, or was this 
meant to mirror the existing nullability? If the former, the implementation 
doesn't quite achieve that.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface for shredding variant values at write time.
+ * <p>
+ * Implementations parse variant binary data (value + metadata bytes) and 
produce
+ * a shredded {@link GenericRecord} with typed_value columns populated 
according
+ * to the shredding schema.
+ * <p>
+ * This interface allows the variant binary parsing logic (which may depend on
+ * engine-specific libraries like Spark's variant module) to be loaded via 
reflection,
+ * keeping the core write support free of engine-specific dependencies.
+ */
+public interface VariantShreddingProvider {
+
+  /**

Review Comment:
   🤖 nit: same naming convention issue as `VariantSchemaUtils` — have you 
considered `HoodieVariantShreddingProvider`? This is a public interface in 
`hudi-common` and will be referenced by its unqualified name in reflection 
configs, so aligning with the `Hoodie` prefix now avoids a rename later.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java:
##########
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.ShreddingUtils;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantSchema;
+import org.apache.spark.types.variant.VariantShreddingWriter;
+import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
+import 
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Implementation of {@link VariantShreddingProvider} using Spark 4's variant 
parsing library.
+ *
+ * <p>This class bridges the Avro record path and Spark's {@link 
VariantShreddingWriter}
+ * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It 
converts
+ * the shredded output into Avro {@link GenericRecord}s that can be written via
+ * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p>
+ *
+ * <p>The shredding logic is delegated to {@link 
VariantShreddingWriter#castShredded},
+ * which handles scalar, object, and array shredding including residual value 
construction
+ * for non-matching fields. This class implements the {@link ShreddedResult} 
and
+ * {@link ShreddedResultBuilder} interfaces to collect the shredded components 
into
+ * Avro GenericRecords.</p>
+ */
+public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider {
+
+  private static final String VALUE_FIELD = "value";
+  private static final String METADATA_FIELD = "metadata";
+  private static final String TYPED_VALUE_FIELD = "typed_value";
+
+  @Override
+  public GenericRecord shredVariantRecord(
+      GenericRecord unshreddedVariant,
+      Schema shreddedSchema,
+      HoodieSchema.Variant variantSchema) {
+
+    ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD);
+    ByteBuffer metadataBuf = (ByteBuffer) 
unshreddedVariant.get(METADATA_FIELD);
+
+    if (valueBuf == null || metadataBuf == null) {
+      return null;
+    }
+
+    byte[] valueBytes = toByteArray(valueBuf);
+    byte[] metadataBytes = toByteArray(metadataBuf);
+
+    Variant variant = new Variant(valueBytes, metadataBytes);
+
+    // Build VariantSchema from the Avro shredded schema, registering
+    // Avro schemas at each level for GenericRecord construction.
+    AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder();
+    VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, 
builder);
+
+    // Delegate to Spark's VariantShreddingWriter for the actual shredding 
logic.
+    AvroShreddedResult result = (AvroShreddedResult)
+        VariantShreddingWriter.castShredded(variant, sparkSchema, builder);
+
+    return result.toGenericRecord();
+  }
+
+  @Override
+  public GenericRecord rebuildVariantRecord(
+      GenericRecord shreddedVariant,

Review Comment:
   🤖 If the input `shreddedVariant` is non-null but `metadataBuf` happens to be 
null, we return null and the caller in `VariantReconstruction.reconstruct` puts 
that null into the output record — silently losing the variant data. Since 
`metadata` is REQUIRED in the shredded parquet schema, hitting this branch 
likely indicates a malformed file. Would a thrown exception (or at minimum a 
warn-log) be safer than silent data drop?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared helpers for converting between shredded and unshredded variant 
schemas.
+ * Used by both the write path ({@link HoodieAvroWriteSupport}) and the read 
path
+ * (variant reconstruction in the parquet reader).
+ */
+public class VariantSchemaUtils {
+
+  private VariantSchemaUtils() {

Review Comment:
   🤖 nit: could you rename this to `HoodieVariantSchemaUtils`? Every other 
public utility class in this package (e.g. `HoodieAvroUtils`, 
`HoodieSchemaUtils`) carries the `Hoodie` prefix, so this one stands out as 
inconsistent.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to