hudi-agent commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3427592171


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,289 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
   private final Option<HoodieBloomFilterWriteSupport<String>> 
bloomFilterWriteSupportOpt;
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) HoodieSchema used for writing.
+   */
+  private final HoodieSchema effectiveHoodieSchema;

Review Comment:
   🤖 nit: `effectiveHoodieSchema` is assigned in the constructor but never read 
elsewhere — only `effectiveAvroSchema` is used by `shredRecord`. Could you drop 
the field (and the accompanying javadoc) to avoid future readers wondering what 
it's for?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java:
##########
@@ -140,9 +142,42 @@ private HoodieAvroWriteSupport 
getHoodieAvroWriteSupport(HoodieSchema schema,
                                                            
StorageConfiguration storageConf,
                                                            boolean 
enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
+    // Work on a copy so we never mutate the shared config's internal 
Properties.
+    Properties props = TypedProperties.copy(config.getProps());
+    // Auto-detect variant shredding provider from classpath if not explicitly 
configured
+    if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
+      String detectedClass = detectShreddingProviderClass();
+      if (detectedClass != null) {
+        props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), 
detectedClass);
+      }
+    }
     return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
         
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
         new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, 
Properties.class},
-        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(schema), schema, filter, 
config.getProps());
+        // Build the Parquet schema from the effective (possibly shredded) 
schema so the message type
+        // matches the records actually written - a shredded variant has a 
nullable value and a
+        // typed_value column; converting the original schema would mark value 
REQUIRED and drop
+        // typed_value, failing the write with "Null-value for required field: 
value".
+        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema, 
filter, props);
+  }
+
+  // Sole provider today: variant shredding currently requires Spark 4.0+. If 
more providers are
+  // added, restore a candidate list here and try each in turn.
+  private static final String SPARK4_VARIANT_SHREDDING_PROVIDER =
+      "org.apache.hudi.variant.Spark4VariantShreddingProvider";

Review Comment:
   🤖 nit: this `private static final` constant is declared between two methods 
— could you move it up next to the other class-level declarations so it's 
easier to spot when scanning the file?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +325,179 @@ public void addFooterMetadata(String key, String value) {
     footerMetadata.put(key, value);
   }
 
+  /**
+   * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a 
shredded variant field,
+   * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+   */
+  private static final class ShreddedVariantField {
+    private final Schema avroSchema;
+    private final HoodieSchema.Variant hoodieSchema;
+
+    ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema) 
{
+      this.avroSchema = avroSchema;
+      this.hoodieSchema = hoodieSchema;
+    }
+  }
+
+  private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+      "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+  /**
+   * Applies a forced shredding schema to all variant fields in the given 
schema.
+   * The forced schema DDL (e.g., {@code "a int, b string"}) defines the 
typed_value
+   * fields that will be added to each variant column.
+   */
+  private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, 
String ddl) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShreddedObject(
+            unwrapped.getAvroSchema().getName(),
+            unwrapped.getAvroSchema().getNamespace(),
+            unwrapped.getAvroSchema().getDoc(),
+            shreddedFields);
+        HoodieSchema replacement = wasNullable
+            ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+        changed = true;
+      } else {
+        newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+      }
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+
+  /**
+   * Parses a DDL-style shredding schema string (e.g., {@code "a int, b 
string, c decimal(15,1)"})
+   * into a map of field names to their HoodieSchema types.
+   */
+  private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+    Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+    // Split on top-level commas only so parameterized types such as 
decimal(15, 1) survive intact.
+    for (String fieldDef : StringUtils.splitTopLevelCommas(ddl)) {
+      String[] parts = fieldDef.split("\\s+", 2);
+      if (parts.length != 2) {
+        throw new IllegalArgumentException(
+            "Invalid shredding DDL field definition (expected 'name type'): " 
+ fieldDef);
+      }
+      fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+    }
+    return fields;
+  }
+
+  /**
+   * Parses a simple type name into a HoodieSchema.
+   * Supports common types: int, long, string, double, float, boolean, binary, 
decimal(p,s).
+   */
+  private static HoodieSchema parseSimpleType(String type) {
+    String lower = type.toLowerCase();
+    switch (lower) {
+      case "int":
+      case "integer":
+        return HoodieSchema.create(HoodieSchemaType.INT);
+      case "long":
+      case "bigint":
+        return HoodieSchema.create(HoodieSchemaType.LONG);
+      case "string":
+        return HoodieSchema.create(HoodieSchemaType.STRING);
+      case "double":
+        return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+      case "float":
+        return HoodieSchema.create(HoodieSchemaType.FLOAT);
+      case "boolean":
+        return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+      case "binary":
+        return HoodieSchema.create(HoodieSchemaType.BYTES);
+      default:
+        Matcher m = DECIMAL_PATTERN.matcher(lower);
+        if (m.matches()) {
+          return HoodieSchema.createDecimal(
+              Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+        }
+        throw new IllegalArgumentException("Unsupported shredding type: " + 
type);
+    }
+  }
+
+  /**
+   * Strips shredding from variant fields in the schema.
+   * Replaces shredded variant fields with unshredded variants (removing 
typed_value).
+   */
+  private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+        if (variant.isShredded()) {
+          // Replace with unshredded variant
+          HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+              unwrapped.getAvroSchema().getName(),
+              unwrapped.getAvroSchema().getNamespace(),
+              unwrapped.getAvroSchema().getDoc());
+          HoodieSchema replacement = wasNullable ? 
HoodieSchema.createNullable(unshredded) : unshredded;
+          newFields.add(field.withSchema(replacement));
+          changed = true;
+          continue;
+        }
+      }
+      newFields.add(field);
+    }
+
+    if (!changed) {
+      return schema;
+    }

Review Comment:
   🤖 nit: `applyForcedShreddingSchema` and `stripVariantShredding` share nearly 
identical scaffolding (record-type check, nullable unwrap, rebuild-on-change, 
record reconstruction) — could the variant-replacement logic be parameterized 
through a single helper that takes a `Function<HoodieSchema.Variant, 
HoodieSchema>` so the two stay in sync as the schema model evolves?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to