voonhous commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3371423821


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,256 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
   private final Option<HoodieBloomFilterWriteSupport<String>> 
bloomFilterWriteSupportOpt;
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) HoodieSchema used for writing.
+   */
+  private final HoodieSchema effectiveHoodieSchema;
+
+  /**
+   * The effective Avro schema (derived from effectiveHoodieSchema).
+   */
+  private final Schema effectiveAvroSchema;
+
+  /**
+   * Indices of top-level variant fields that need shredding transformation.
+   * Empty array if no shredding is needed.
+   */
+  private final int[] shreddedVariantFieldIndices;
+
+  /**
+   * The shredded Avro sub-schema for each variant field at the corresponding 
index.
+   * Indexed by position in {@link #shreddedVariantFieldIndices}.
+   */
+  private final Schema[] shreddedVariantAvroSchemas;
+
+  /**
+   * The HoodieSchema.Variant for each variant field at the corresponding 
index.
+   * Indexed by position in {@link #shreddedVariantFieldIndices}.
+   */
+  private final HoodieSchema.Variant[] shreddedVariantHoodieSchemas;
+
+  /**
+   * Provider for variant shredding (loaded via reflection). Null if no 
shredding is needed.
+   */
+  private final VariantShreddingProvider shreddingProvider;
+
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
-    super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+    this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
+  }
+
+  private HoodieAvroWriteSupport(MessageType schema, HoodieSchema 
hoodieSchema, HoodieSchema effectiveSchema,
+                                 Option<BloomFilter> bloomFilterOpt, 
Properties properties) {
+    super(schema, effectiveSchema.toAvroSchema(), 
ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
     String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
     if (!vectorMeta.isEmpty()) {
       footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
     }
+
+    this.effectiveHoodieSchema = effectiveSchema;
+    this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+    this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    // Identify variant fields that need shredding
+    List<Integer> variantIndices = new ArrayList<>();
+    List<Schema> variantAvroSchemas = new ArrayList<>();
+    List<HoodieSchema.Variant> variantHoodieSchemas = new ArrayList<>();
+
+    if (variantWriteShreddingEnabled && effectiveSchema.getType() == 
HoodieSchemaType.RECORD) {
+      List<HoodieSchemaField> fields = effectiveSchema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        HoodieSchemaField field = fields.get(i);
+        HoodieSchema fieldSchema = field.schema();
+        // Unwrap nullable union to get the underlying type
+        if (fieldSchema.isNullable()) {
+          fieldSchema = fieldSchema.getNonNullType();
+        }
+        if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+          if (variant.isShredded() && 
variant.getTypedValueField().isPresent()) {
+            variantIndices.add(i);
+            // Get the Avro sub-schema for this variant field from the 
effective schema
+            Schema fieldAvroSchema = 
effectiveAvroSchema.getFields().get(i).schema();
+            // Unwrap nullable union
+            if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+              fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+            }
+            variantAvroSchemas.add(fieldAvroSchema);
+            variantHoodieSchemas.add(variant);
+          }
+        }
+      }
+    }
+
+    this.shreddedVariantFieldIndices = 
variantIndices.stream().mapToInt(Integer::intValue).toArray();
+    this.shreddedVariantAvroSchemas = variantAvroSchemas.toArray(new 
Schema[0]);
+    this.shreddedVariantHoodieSchemas = variantHoodieSchemas.toArray(new 
HoodieSchema.Variant[0]);
+
+    // Load shredding provider via reflection if needed
+    if (shreddedVariantFieldIndices.length > 0) {
+      String providerClass = 
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+      if (providerClass == null || providerClass.isEmpty()) {
+        throw new HoodieException("Variant write shredding is enabled and the 
write schema requires shredding "
+            + "(typed_value columns present), but no VariantShreddingProvider 
is configured or available on the "
+            + "classpath. Set " + 
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key() + " or add a provider "
+            + "implementation (e.g. the Spark variant module) to the 
classpath.");
+      }
+      this.shreddingProvider = (VariantShreddingProvider) 
ReflectionUtils.loadClass(providerClass);
+    } else {
+      this.shreddingProvider = null;
+    }
+  }
+
+  /**
+   * Generates the effective schema for writing, applying variant shredding 
configuration.
+   *
+   * <p>When shredding is disabled, shredded variant fields are replaced with 
unshredded
+   * variants (removing {@code typed_value}) so that the Parquet file does not 
contain
+   * unused typed_value columns.</p>
+   *
+   * <p>When shredding is enabled and a forced shredding schema is configured 
via
+   * {@link 
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+   * all variant fields are replaced with shredded variants using the forced 
schema.
+   * This handles the case where the input schema is unshredded but shredding 
is desired.</p>
+   *
+   * <p>When shredding is enabled without a forced schema, the schema is 
returned as-is
+   * (already-shredded variants stay shredded, unshredded variants stay 
unshredded).</p>
+   *
+   * @param hoodieSchema the original HoodieSchema
+   * @param properties   writer properties containing shredding configuration
+   * @return the effective schema to use for writing
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, Properties properties) {
+    boolean shreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    if (!shreddingEnabled) {
+      // Schemas from clustering/compaction may still be shredded (read from 
on-disk Parquet files
+      // written with shredding enabled), so we need to strip typed_value when 
shredding
+      // is disabled.
+      return unshreddVariantFields(hoodieSchema);
+    }
+
+    // Check if a forced shredding schema is configured
+    String forceShreddingSchema = properties.getProperty(
+        PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+    if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+      return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+    }
+
+    // When enabled without forced schema, use the schema as-is
+    // (shredded variants stay shredded, unshredded variants stay unshredded)
+    return hoodieSchema;
+  }
+
+  /**
+   * Overloaded version accepting HoodieConfig for use by factories.
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, HoodieConfig config) {
+    return generateEffectiveSchema(hoodieSchema, config.getProps());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(T record) {
+    if (shreddedVariantFieldIndices.length > 0 && shreddingProvider != null) {
+      IndexedRecord inputRecord = (IndexedRecord) record;
+      GenericRecord shreddedRecord = new 
GenericData.Record(effectiveAvroSchema);
+
+      // Copy all fields, transforming variant fields that need shredding
+      List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+      Schema inputSchema = inputRecord.getSchema();
+
+      for (int i = 0; i < effectiveFields.size(); i++) {
+        Schema.Field effectiveField = effectiveFields.get(i);
+        String fieldName = effectiveField.name();
+        Schema.Field inputField = inputSchema.getField(fieldName);
+        if (inputField == null) {
+          continue;
+        }
+
+        int variantIdx = findVariantIndex(i);
+        if (variantIdx >= 0) {
+          // This is a shredded variant field - transform it
+          Object fieldValue = inputRecord.get(inputField.pos());
+          if (fieldValue instanceof GenericRecord) {
+            GenericRecord variantRecord = (GenericRecord) fieldValue;
+            GenericRecord shreddedVariant = 
shreddingProvider.shredVariantRecord(
+                variantRecord,
+                shreddedVariantAvroSchemas[variantIdx],
+                shreddedVariantHoodieSchemas[variantIdx]);
+            shreddedRecord.put(i, shreddedVariant);

Review Comment:
   Good catch. This is the read-then-reshred path: compaction/clustering over a 
base file written with shredding. There is no reader-side step that 
reconstructs the unshredded `{metadata, value}` form, so an already-shredded 
record reaches the writer and `shredVariantRecord` returns null and drops it.
   
   Same root cause as the symmetric shredding-disabled case: 
https://github.com/apache/hudi/pull/18065#discussion_r3368472566
   
   For now I added a fail-fast guard in `write()` 
(`assertInputNotAlreadyShredded`) that throws on an already-shredded input 
variant instead of silently losing data. The real fix (reconstruct on read, 
then re-shred) is tracked in #18931.



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########


Review Comment:
   Confirmed, and this is the symmetric counterpart of the shredding-enabled 
thread: https://github.com/apache/hudi/pull/18065#discussion_r3368472554
   
   Same root cause, no reader-side reconstruction of the unshredded variant. 
With shredding disabled the unshredded writer schema makes `value` REQUIRED, so 
an already-shredded record (value null) fails at write.
   
   Both are now guarded by the same fail-fast check in `write()` 
(`assertInputNotAlreadyShredded`), and the real fix is tracked in #18931.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to