This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23fe7bfcd7e6 feat(variant): Add support to write shredded variants for 
HoodieRecordType.AVRO (#18065)
23fe7bfcd7e6 is described below

commit 23fe7bfcd7e6cbbc72a05472384f942a88a812b8
Author: voonhous <[email protected]>
AuthorDate: Thu Jun 18 15:07:09 2026 +0800

    feat(variant): Add support to write shredded variants for 
HoodieRecordType.AVRO (#18065)
    
    Adds write support for shredded Variant on the AVRO record path. Variants 
are
    decomposed into a typed_value column for better compression and column 
pruning.
    Shredding is delegated to a VariantShreddingProvider (in hudi-common, loaded
    reflectively), with a Spark 4 implementation built on Spark's
    VariantShreddingWriter.
    
    - Config: new advanced hoodie.parquet.variant.shredding.provider.class
      (sinceVersion 1.3.0), auto-detected from the classpath when unset. Gated 
by
      the existing write.shredding.enabled; reads governed by 
allow.reading.shredded.
    - Write path: HoodieAvroFileWriterFactory builds the Parquet schema from the
      effective (shredded) schema so value is nullable and typed_value is 
present;
      injects the provider on a copied Properties so the shared config is never
      mutated. Fails fast when shredding is required but no provider is 
available,
      and on the not-yet-supported read-then-reshred path (#18931).
    - Spark read: BaseSpark4Adapter recognizes both unshredded (2-field) and
      shredded (3-field) physical layouts as Variant. Spark 4.0.x keeps a 
reorder
      workaround for its hardcoded [value, metadata] converter; 4.1+ reads by 
name
      (SPARK-54410), removable later (#18935). No session SQLConf mutation - 
Spark's
      converter reads the flag via SQLConf.get.
    - DDL parsing: shared StringUtils.splitTopLevelCommas (paren-aware) keeps
      decimal(15, 1) intact, reused by both the shredding DDL parser and the 
vector
      column metadata parser.
    - Tests: Spark integration (TestVariantDataType) plus unit coverage for the
      forced-shredding DDL (incl. decimal), shredded schema shape, the
      read-then-reshred guard, and splitTopLevelCommas.
    
    Limitations: shredded writes require Spark 4.0+; reading shredded data back
    requires Spark 4.1+ (Spark 4.0 read deferred, #18931).
    
    Closes #18066
    Closes #17748
---
 .../apache/hudi/avro/VariantShreddingProvider.java |  66 ++++
 .../hudi/common/config/HoodieStorageConfig.java    |  22 +-
 .../apache/hudi/common/schema/HoodieSchema.java    |  30 +-
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   | 424 ++++++++++++++++++++-
 .../hadoop/HoodieAvroFileWriterFactory.java        |  37 +-
 .../avro/TestHoodieAvroWriteSupportShredding.java  |  71 ++++
 .../io/hadoop/TestHoodieBaseParquetWriter.java     |  51 +++
 .../org/apache/hudi/common/util/StringUtils.java   |  36 ++
 .../apache/hudi/common/util/TestStringUtils.java   |  15 +
 .../sql/hudi/dml/schema/TestVariantDataType.scala  | 158 ++++++++
 .../variant/Spark4VariantShreddingProvider.java    | 401 +++++++++++++++++++
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |  23 +-
 .../parquet/Spark40HoodieParquetReadSupport.scala  |   2 +-
 13 files changed, 1290 insertions(+), 46 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
new file mode 100644
index 000000000000..90ffbee1e53f
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface for shredding variant values at write time.
+ * <p>
+ * Implementations parse variant binary data (value + metadata bytes) and 
produce
+ * a shredded {@link GenericRecord} with typed_value columns populated 
according
+ * to the shredding schema.
+ * <p>
+ * This interface allows the variant binary parsing logic (which may depend on
+ * engine-specific libraries like Spark's variant module) to be loaded via 
reflection,
+ * keeping the core write support free of engine-specific dependencies.
+ */
+public interface VariantShreddingProvider {
+
+  /**
+   * Transform an unshredded variant GenericRecord into a shredded one.
+   * <p>
+   * The input record is expected to have:
+   * <ul>
+   *   <li>{@code value}: ByteBuffer containing the variant value binary</li>
+   *   <li>{@code metadata}: ByteBuffer containing the variant metadata 
binary</li>
+   * </ul>
+   * <p>
+   * The output record should conform to {@code shreddedSchema} and have:
+   * <ul>
+   *   <li>{@code value}: ByteBuffer or null (null when typed_value captures 
the full value)</li>
+   *   <li>{@code metadata}: ByteBuffer (always present)</li>
+   *   <li>{@code typed_value}: the typed representation extracted from the 
variant binary,
+   *       or null if the variant type does not match the typed_value 
schema</li>
+   * </ul>
+   *
+   * @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata: 
ByteBuffer}
+   * @param shreddedSchema    target Avro schema with {value: nullable 
ByteBuffer, metadata: ByteBuffer, typed_value: type}
+   * @param variantSchema     HoodieSchema.Variant containing the shredding 
schema information
+   * @return a GenericRecord conforming to shreddedSchema with typed_value 
populated where possible
+   */
+  GenericRecord shredVariantRecord(
+      GenericRecord unshreddedVariant,
+      Schema shreddedSchema,
+      HoodieSchema.Variant variantSchema);
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index cbdf5cc223e0..4c3655e6cc64 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -235,11 +235,12 @@ public class HoodieStorageConfig extends HoodieConfig {
       .noDefaultValue()
       .markAdvanced()
       .sinceVersion("1.1.0")
-      .withDocumentation("Forces a specific shredding schema for all variant 
columns, intended for testing. "
+      .withDocumentation("Test-only: forces a specific shredding schema for 
all variant columns. Not intended "
+          + "for production use; it exists solely to exercise the shredding 
write path in tests, mirroring "
+          + "Spark's internal spark.sql.variant.forceShreddingSchemaForTest. "
           + "The value should be a DDL-format schema string (e.g., 'a int, b 
string, c decimal(15, 1)'). "
           + "When set and write shredding is enabled, this schema overrides 
the schema-driven shredding "
-          + "configuration for all variant columns. "
-          + "Equivalent to Spark's 
spark.sql.variant.forceShreddingSchemaForTest.");
+          + "configuration for all variant columns.");
 
   public static final ConfigProperty<Boolean> 
PARQUET_VARIANT_ALLOW_READING_SHREDDED = ConfigProperty
       .key("hoodie.parquet.variant.allow.reading.shredded")
@@ -250,6 +251,16 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "When disabled, only unshredded variant data can be read. "
           + "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
 
+  public static final ConfigProperty<String> 
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
+      .key("hoodie.parquet.variant.shredding.provider.class")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.3.0")
+      .withDocumentation("Fully-qualified class name of the 
VariantShreddingProvider implementation "
+          + "used to shred variant values at write time in the Avro record 
path. "
+          + "The provider parses variant binary data and populates typed_value 
columns. "
+          + "When not set, the provider is auto-detected from the classpath.");
+
   public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = 
ConfigProperty
       .key("hoodie.parquet.write.utc-timezone.enabled")
       .defaultValue(true)
@@ -596,11 +607,6 @@ public class HoodieStorageConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder parquetVariantForceShreddingSchemaForTest(String 
schemaString) {
-      storageConfig.setValue(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST, 
schemaString);
-      return this;
-    }
-
     public Builder parquetVariantAllowReadingShredded(boolean allowed) {
       storageConfig.setValue(PARQUET_VARIANT_ALLOW_READING_SHREDDED, 
String.valueOf(allowed));
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 0b0978d552a2..81f467124f20 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.schema;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieAvroSchemaException;
@@ -282,35 +283,18 @@ public class HoodieSchema implements Serializable {
    * @return set of field names (preserves insertion order), or empty set if 
input is null / empty
    */
   public static Set<String> parseVectorColumnNames(String footerValue) {
-    if (footerValue == null || footerValue.isEmpty()) {
-      return Collections.emptySet();
-    }
     LinkedHashSet<String> names = new LinkedHashSet<>();
-    int depth = 0;
-    int start = 0;
-    for (int i = 0; i < footerValue.length(); i++) {
-      char c = footerValue.charAt(i);
-      if (c == '(') {
-        depth++;
-      } else if (c == ')') {
-        depth--;
-      } else if (c == ',' && depth == 0) {
-        addVectorColumnName(footerValue, start, i, names);
-        start = i + 1;
+    // Split on top-level commas only so the comma inside a VECTOR(dim, 
elemType) descriptor is
+    // not mistaken for a column separator.
+    for (String entry : StringUtils.splitTopLevelCommas(footerValue)) {
+      int colon = entry.indexOf(':');
+      if (colon > 0) {
+        names.add(entry.substring(0, colon).trim());
       }
     }
-    addVectorColumnName(footerValue, start, footerValue.length(), names);
     return names;
   }
 
-  private static void addVectorColumnName(String s, int start, int end, 
Set<String> names) {
-    int colon = s.indexOf(':', start);
-    if (colon > start && colon < end) {
-      names.add(s.substring(start, colon).trim());
-    }
-  }
-
-
   private Schema avroSchema;
   private HoodieSchemaType type;
   private transient List<HoodieSchemaField> fields;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index df7930060fe9..3cd3c965388b 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -20,22 +20,46 @@
 package org.apache.hudi.avro;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroWriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 
 /**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant 
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link 
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code 
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider} 
loaded via reflection.</p>
  */
 public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
 
@@ -43,15 +67,236 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
   private final Map<String, String> footerMetadata = new HashMap<>();
   protected final Properties properties;
 
+  /**
+   * Whether variant write shredding is enabled via config.
+   */
+  private final boolean variantWriteShreddingEnabled;
+
+  /**
+   * The effective (possibly shredded) Avro schema used for writing.
+   */
+  private final Schema effectiveAvroSchema;
+
+  /**
+   * Variant fields that need shredding, keyed by their index in the effective 
schema.
+   * Empty if no shredding is needed.
+   */
+  private final Map<Integer, ShreddedVariantField> shreddedVariantFields;
+
+  /**
+   * Provider for variant shredding (loaded via reflection). Null if no 
shredding is needed.
+   */
+  private final VariantShreddingProvider shreddingProvider;
+
+  /**
+   * Names of all variant-typed top-level fields, regardless of shredding. 
Used to fail fast on the
+   * not-yet-supported read-then-reshred path (compaction/clustering over an 
already-shredded base
+   * file). See https://github.com/apache/hudi/issues/18931.
+   */
+  private final String[] variantFieldNames;
+
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
-    super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+    this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
+  }
+
+  private HoodieAvroWriteSupport(MessageType schema, HoodieSchema 
hoodieSchema, HoodieSchema effectiveSchema,
+                                 Option<BloomFilter> bloomFilterOpt, 
Properties properties) {
+    super(schema, effectiveSchema.toAvroSchema(), 
ConvertingGenericData.INSTANCE);
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
     this.properties = properties;
     String vectorMeta = 
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
     if (!vectorMeta.isEmpty()) {
       footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
     }
+
+    this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+    this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    // Single pass over the effective schema fields: collect every 
variant-typed field name (for the
+    // read-then-reshred guard) and, when shredding is enabled, the subset 
that needs shredding.
+    Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
+    List<String> variantNames = new ArrayList<>();
+
+    if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
+      List<HoodieSchemaField> fields = effectiveSchema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        HoodieSchemaField field = fields.get(i);
+        HoodieSchema fieldSchema = field.schema();
+        // Unwrap nullable union to get the underlying type
+        if (fieldSchema.isNullable()) {
+          fieldSchema = fieldSchema.getNonNullType();
+        }
+        if (fieldSchema.getType() != HoodieSchemaType.VARIANT) {
+          continue;
+        }
+        variantNames.add(field.name());
+        if (variantWriteShreddingEnabled) {
+          HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+          if (variant.isShredded() && 
variant.getTypedValueField().isPresent()) {
+            // Get the Avro sub-schema for this variant field from the 
effective schema
+            Schema fieldAvroSchema = 
effectiveAvroSchema.getFields().get(i).schema();
+            // Unwrap nullable union
+            if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+              fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+            }
+            shreddedFields.put(i, new ShreddedVariantField(fieldAvroSchema, 
variant));
+          }
+        }
+      }
+    }
+
+    this.shreddedVariantFields = shreddedFields;
+    this.variantFieldNames = variantNames.toArray(new String[0]);
+
+    // Load shredding provider via reflection if needed
+    if (!shreddedVariantFields.isEmpty()) {
+      String providerClass = 
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+      if (providerClass == null || providerClass.isEmpty()) {
+        throw new HoodieException("Variant write shredding is enabled and the 
write schema requires shredding "
+            + "(typed_value columns present), but no VariantShreddingProvider 
is configured or available on the "
+            + "classpath. Set " + 
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key() + " or add a provider "
+            + "implementation (e.g. the Spark variant module) to the 
classpath.");
+      }
+      this.shreddingProvider = (VariantShreddingProvider) 
ReflectionUtils.loadClass(providerClass);
+    } else {
+      this.shreddingProvider = null;
+    }
+  }
+
+  /**
+   * Generates the effective schema for writing, applying variant shredding 
configuration.
+   *
+   * <p>When shredding is disabled, shredded variant fields are replaced with 
unshredded
+   * variants (removing {@code typed_value}) so that the Parquet file does not 
contain
+   * unused typed_value columns.</p>
+   *
+   * <p>When shredding is enabled and a forced shredding schema is configured 
via
+   * {@link 
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+   * all variant fields are replaced with shredded variants using the forced 
schema.
+   * This handles the case where the input schema is unshredded but shredding 
is desired.</p>
+   *
+   * <p>When shredding is enabled without a forced schema, the schema is 
returned as-is
+   * (already-shredded variants stay shredded, unshredded variants stay 
unshredded).</p>
+   *
+   * @param hoodieSchema the original HoodieSchema
+   * @param properties   writer properties containing shredding configuration
+   * @return the effective schema to use for writing
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, Properties properties) {
+    boolean shreddingEnabled = Boolean.parseBoolean(
+        properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+            
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+    if (!shreddingEnabled) {
+      // Schemas from clustering/compaction may still be shredded (read from 
on-disk Parquet files
+      // written with shredding enabled), so we need to strip typed_value when 
shredding
+      // is disabled.
+      return stripVariantShredding(hoodieSchema);
+    }
+
+    // Check if a forced shredding schema is configured
+    String forceShreddingSchema = properties.getProperty(
+        PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+    if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+      return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+    }
+
+    // When enabled without forced schema, use the schema as-is
+    // (shredded variants stay shredded, unshredded variants stay unshredded)
+    return hoodieSchema;
+  }
+
+  /**
+   * Overloaded version accepting HoodieConfig for use by factories.
+   */
+  public static HoodieSchema generateEffectiveSchema(HoodieSchema 
hoodieSchema, HoodieConfig config) {
+    return generateEffectiveSchema(hoodieSchema, config.getProps());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void write(T record) {
+    if (variantFieldNames.length > 0) {
+      assertInputNotAlreadyShredded((IndexedRecord) record);
+    }
+    if (!shreddedVariantFields.isEmpty() && shreddingProvider != null) {
+      super.write((T) shredRecord((IndexedRecord) record));
+    } else {
+      super.write(record);
+    }
+  }
+
+  /**
+   * Builds a shredded copy of {@code inputRecord}: variant fields configured 
for shredding are
+   * transformed via the {@link VariantShreddingProvider} to populate {@code 
typed_value}; all other
+   * fields are copied across as-is.
+   */
+  private GenericRecord shredRecord(IndexedRecord inputRecord) {
+    GenericRecord shreddedRecord = new GenericData.Record(effectiveAvroSchema);
+
+    // Copy all fields, transforming variant fields that need shredding
+    List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+    Schema inputSchema = inputRecord.getSchema();
+
+    for (int i = 0; i < effectiveFields.size(); i++) {
+      Schema.Field effectiveField = effectiveFields.get(i);
+      String fieldName = effectiveField.name();
+      Schema.Field inputField = inputSchema.getField(fieldName);
+      if (inputField == null) {
+        continue;
+      }
+
+      ShreddedVariantField shreddedField = shreddedVariantFields.get(i);
+      if (shreddedField != null) {
+        // This is a shredded variant field - transform it
+        Object fieldValue = inputRecord.get(inputField.pos());
+        if (fieldValue instanceof GenericRecord) {
+          GenericRecord variantRecord = (GenericRecord) fieldValue;
+          GenericRecord shreddedVariant = shreddingProvider.shredVariantRecord(
+              variantRecord,
+              shreddedField.avroSchema,
+              shreddedField.hoodieSchema);
+          shreddedRecord.put(i, shreddedVariant);
+        } else {
+          // Null or unexpected type - copy as-is
+          shreddedRecord.put(i, fieldValue);
+        }
+      } else {
+        // Non-variant field - copy as-is
+        shreddedRecord.put(i, inputRecord.get(inputField.pos()));
+      }
+    }
+
+    return shreddedRecord;
+  }
+
+  /**
+   * Fails fast on the not-yet-supported read-then-reshred path. Records read 
from an already-shredded
+   * base file (compaction/clustering) arrive with a populated {@code 
typed_value} and a possibly-null
+   * {@code value}. The writer has no logic to reconstruct the unshredded 
variant, so shredding would
+   * silently drop the payload (shredding enabled) and the parquet writer 
would reject the null at the
+   * REQUIRED {@code value} field (shredding disabled). Reconstruction is 
tracked in
+   * https://github.com/apache/hudi/issues/18931.
+   */
+  private void assertInputNotAlreadyShredded(IndexedRecord inputRecord) {
+    Schema inputSchema = inputRecord.getSchema();
+    for (String fieldName : variantFieldNames) {
+      Schema.Field field = inputSchema.getField(fieldName);
+      if (field == null) {
+        continue;
+      }
+      Object value = inputRecord.get(field.pos());
+      if (value instanceof GenericRecord
+          && ((GenericRecord) 
value).getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD) != 
null) {
+        throw new HoodieException("Writing an already-shredded variant field 
'" + fieldName
+            + "' is not supported yet. Compaction/clustering read a base file 
written with variant "
+            + "shredding and re-wrote it through the Avro path; the reader 
does not yet reconstruct "
+            + "the unshredded variant. Tracked in 
https://github.com/apache/hudi/issues/18931.";);
+      }
+    }
   }
 
   @Override
@@ -74,6 +319,179 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     footerMetadata.put(key, value);
   }
 
+  /**
+   * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a 
shredded variant field,
+   * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+   */
+  private static final class ShreddedVariantField {
+    private final Schema avroSchema;
+    private final HoodieSchema.Variant hoodieSchema;
+
+    ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema) 
{
+      this.avroSchema = avroSchema;
+      this.hoodieSchema = hoodieSchema;
+    }
+  }
+
+  private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+      "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+  /**
+   * Applies a forced shredding schema to all variant fields in the given 
schema.
+   * The forced schema DDL (e.g., {@code "a int, b string"}) defines the 
typed_value
+   * fields that will be added to each variant column.
+   */
+  private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema, 
String ddl) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShreddedObject(
+            unwrapped.getAvroSchema().getName(),
+            unwrapped.getAvroSchema().getNamespace(),
+            unwrapped.getAvroSchema().getDoc(),
+            shreddedFields);
+        HoodieSchema replacement = wasNullable
+            ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+        changed = true;
+      } else {
+        newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+      }
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+
+  /**
+   * Parses a DDL-style shredding schema string (e.g., {@code "a int, b 
string, c decimal(15,1)"})
+   * into a map of field names to their HoodieSchema types.
+   */
+  private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+    Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+    // Split on top-level commas only so parameterized types such as 
decimal(15, 1) survive intact.
+    for (String fieldDef : StringUtils.splitTopLevelCommas(ddl)) {
+      String[] parts = fieldDef.split("\\s+", 2);
+      if (parts.length != 2) {
+        throw new IllegalArgumentException(
+            "Invalid shredding DDL field definition (expected 'name type'): " 
+ fieldDef);
+      }
+      fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+    }
+    return fields;
+  }
+
+  /**
+   * Parses a simple type name into a HoodieSchema.
+   * Supports common types: int, long, string, double, float, boolean, binary, 
decimal(p,s).
+   */
+  private static HoodieSchema parseSimpleType(String type) {
+    String lower = type.toLowerCase();
+    switch (lower) {
+      case "int":
+      case "integer":
+        return HoodieSchema.create(HoodieSchemaType.INT);
+      case "long":
+      case "bigint":
+        return HoodieSchema.create(HoodieSchemaType.LONG);
+      case "string":
+        return HoodieSchema.create(HoodieSchemaType.STRING);
+      case "double":
+        return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+      case "float":
+        return HoodieSchema.create(HoodieSchemaType.FLOAT);
+      case "boolean":
+        return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+      case "binary":
+        return HoodieSchema.create(HoodieSchemaType.BYTES);
+      default:
+        Matcher m = DECIMAL_PATTERN.matcher(lower);
+        if (m.matches()) {
+          return HoodieSchema.createDecimal(
+              Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+        }
+        throw new IllegalArgumentException("Unsupported shredding type: " + 
type);
+    }
+  }
+
+  /**
+   * Strips shredding from variant fields in the schema.
+   * Replaces shredded variant fields with unshredded variants (removing 
typed_value).
+   */
+  private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+        if (variant.isShredded()) {
+          // Replace with unshredded variant
+          HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+              unwrapped.getAvroSchema().getName(),
+              unwrapped.getAvroSchema().getNamespace(),
+              unwrapped.getAvroSchema().getDoc());
+          HoodieSchema replacement = wasNullable ? 
HoodieSchema.createNullable(unshredded) : unshredded;
+          newFields.add(field.withSchema(replacement));
+          changed = true;
+          continue;
+        }
+      }
+      newFields.add(field);
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+
+  /**
+   * Extracts the non-null type from a union schema.
+   */
+  private static Schema getNonNullFromUnion(Schema unionSchema) {
+    for (Schema type : unionSchema.getTypes()) {
+      if (type.getType() != Schema.Type.NULL) {
+        return type;
+      }
+    }
+    throw new IllegalArgumentException("Union schema does not contain a 
non-null type: " + unionSchema);
+  }
+
   private static class HoodieBloomFilterAvroWriteSupport extends 
HoodieBloomFilterWriteSupport<String> {
     public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
       super(bloomFilter);
@@ -84,4 +502,4 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
       return StringUtils.getUTF8Bytes(key);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index 83ebc7159e01..4d2630d2e7aa 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieParquetConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -54,10 +55,16 @@ import java.io.OutputStream;
 import java.util.Properties;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
 import static 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
 
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
 
+  // Sole provider today: variant shredding currently requires Spark 4.0+. If 
more providers are
+  // added, restore a candidate list here and try each in turn.
+  private static final String SPARK4_VARIANT_SHREDDING_PROVIDER =
+      "org.apache.hudi.variant.Spark4VariantShreddingProvider";
+
   public HoodieAvroFileWriterFactory(HoodieStorage storage) {
     super(storage);
   }
@@ -140,9 +147,37 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
                                                            
StorageConfiguration storageConf,
                                                            boolean 
enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
+    // Work on a copy so we never mutate the shared config's internal 
Properties.
+    Properties props = TypedProperties.copy(config.getProps());
+    // Auto-detect variant shredding provider from classpath if not explicitly 
configured
+    if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
+      String detectedClass = detectShreddingProviderClass();
+      if (detectedClass != null) {
+        props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), 
detectedClass);
+      }
+    }
     return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
         
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
         new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, 
Properties.class},
-        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(schema), schema, filter, 
config.getProps());
+        // Build the Parquet schema from the effective (possibly shredded) 
schema so the message type
+        // matches the records actually written - a shredded variant has a 
nullable value and a
+        // typed_value column; converting the original schema would mark value 
REQUIRED and drop
+        // typed_value, failing the write with "Null-value for required field: 
value".
+        getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema, 
filter, props);
+  }
+
+  /**
+   * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} 
implementation
+   * available on the classpath. Returns the fully-qualified class name if 
found, or null.
+   */
+  private static String detectShreddingProviderClass() {
+    try {
+      Class.forName(SPARK4_VARIANT_SHREDDING_PROVIDER);
+      return SPARK4_VARIANT_SHREDDING_PROVIDER;
+    } catch (ClassNotFoundException e) {
+      // not on classpath
+      return null;
+    }
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
new file mode 100644
index 000000000000..eb936ac512e4
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieAvroWriteSupportShredding {
+
+  /**
+   * The forced-shredding DDL must tolerate the commas inside parameterized 
types such as
+   * {@code decimal(15, 1)}, which is the documented example on
+   * {@link 
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST}. A naive
+   * {@code ddl.split(",")} tore that field apart and threw "Unsupported 
shredding type: decimal(15".
+   */
+  @Test
+  void forcedShreddingDdlTreatsDecimalParensAsOneField() {
+    HoodieSchema record = HoodieSchema.createRecord(
+        "test_record", "org.apache.hudi.test", null,
+        Collections.singletonList(HoodieSchemaField.of("v", 
HoodieSchema.createVariant())));
+
+    Properties props = new Properties();
+    
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
 "true");
+    
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key(),
+        "a int, b string, c decimal(15, 1)");
+
+    HoodieSchema effective = 
HoodieAvroWriteSupport.generateEffectiveSchema(record, props);
+
+    HoodieSchema variantField = effective.getFields().get(0).schema();
+    HoodieSchema variant = variantField.isNullable() ? 
variantField.getNonNullType() : variantField;
+    HoodieSchema typedValueField = variant.getFields().stream()
+        .filter(f -> 
HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD.equals(f.name()))
+        .map(HoodieSchemaField::schema)
+        .findFirst()
+        .orElseThrow(() -> new AssertionError("shredded variant is missing 
typed_value: " + variant));
+    HoodieSchema typedValue = typedValueField.isNullable() ? 
typedValueField.getNonNullType() : typedValueField;
+
+    List<String> shreddedFieldNames = typedValue.getFields().stream()
+        .map(HoodieSchemaField::name)
+        .collect(Collectors.toList());
+    assertEquals(Arrays.asList("a", "b", "c"), shreddedFieldNames);
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index e4e86abef939..9cbee1b44cbc 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -24,15 +24,22 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -41,12 +48,17 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieBaseParquetWriter {
@@ -118,4 +130,43 @@ public class TestHoodieBaseParquetWriter {
           "The writer stops write new records while the file doesn't reach the 
max file size limit");
     }
   }
+
+  @Test
+  public void testRejectAlreadyShreddedVariantInput() {
+    // A record read from an already-shredded base file 
(compaction/clustering) arrives with a
+    // populated typed_value. The Avro write support cannot reconstruct the 
unshredded variant yet,
+    // so it must fail fast rather than silently drop the payload. Guard 
tracked in
+    // https://github.com/apache/hudi/issues/18931.
+    HoodieSchema recordSchema = HoodieSchema.createRecord("guardTest", null, 
null, Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+        HoodieSchemaField.of("v", HoodieSchema.createVariant())));
+
+    // Shredding disabled so no VariantShreddingProvider is required; the 
variant field is still tracked.
+    Properties props = new Properties();
+    
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
 "false");
+
+    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(recordSchema, props);
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(effectiveSchema.toAvroSchema()), 
recordSchema, Option.empty(), props);
+
+    // Input whose variant column is already shredded (carries typed_value).
+    Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+    shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT));
+    Schema shreddedVariantAvro =
+        HoodieSchema.createVariantShreddedObject(null, null, null, 
shreddedFields).getAvroSchema();
+
+    Schema inputAvro = Schema.createRecord("guardInput", null, null, false);
+    inputAvro.setFields(Arrays.asList(
+        new Schema.Field("id", Schema.create(Schema.Type.INT), null, (Object) 
null),
+        new Schema.Field("v", shreddedVariantAvro, null, (Object) null)));
+    GenericRecord variantValue = new GenericData.Record(shreddedVariantAvro);
+    variantValue.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(new byte[] {1}));
+    GenericRecord input = new GenericData.Record(inputAvro);
+    input.put("id", 1);
+    input.put("v", variantValue);
+
+    HoodieException ex = assertThrows(HoodieException.class, () -> 
writeSupport.write(input));
+    assertTrue(ex.getMessage().contains("already-shredded") && 
ex.getMessage().contains("18931"),
+        "Expected the read-then-reshred guard message, got: " + 
ex.getMessage());
+  }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java 
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 3c5545a6bf71..a49a11362288 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -185,6 +186,41 @@ public class StringUtils {
     return Stream.of(input.split(delimiter)).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
   }
 
+  /**
+   * Splits {@code input} on commas that are not nested inside parentheses, 
trimming each
+   * segment and dropping empty ones. Unlike {@link #split(String, String)}, a 
comma inside a
+   * parenthesised descriptor -- e.g. {@code decimal(15, 1)} or {@code 
VECTOR(128, DOUBLE)} -- is
+   * kept as part of the segment instead of being treated as a separator.
+   */
+  public static List<String> splitTopLevelCommas(@Nullable String input) {
+    if (isNullOrEmpty(input)) {
+      return Collections.emptyList();
+    }
+    List<String> parts = new ArrayList<>();
+    int depth = 0;
+    int start = 0;
+    for (int i = 0; i < input.length(); i++) {
+      char c = input.charAt(i);
+      if (c == '(') {
+        depth++;
+      } else if (c == ')') {
+        depth--;
+      } else if (c == ',' && depth == 0) {
+        addTrimmedNonEmpty(parts, input.substring(start, i));
+        start = i + 1;
+      }
+    }
+    addTrimmedNonEmpty(parts, input.substring(start));
+    return parts;
+  }
+
+  private static void addTrimmedNonEmpty(List<String> parts, String segment) {
+    String trimmed = segment.trim();
+    if (!trimmed.isEmpty()) {
+      parts.add(trimmed);
+    }
+  }
+
   public static String getSuffixBy(String input, int ch) {
     int i = input.lastIndexOf(ch);
     if (i == -1) {
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java 
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 43a547744baa..3be3bfe3f998 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -101,6 +101,21 @@ public class TestStringUtils {
     assertEquals("a,", String.join(",", Arrays.asList("a", "")));
   }
 
+  @Test
+  public void testSplitTopLevelCommas() {
+    // Commas inside parentheses must not split the segment (decimal shredding 
DDL + VECTOR descriptors).
+    assertEquals(Arrays.asList("a int", "b string", "c decimal(15, 1)"),
+        StringUtils.splitTopLevelCommas("a int, b string, c decimal(15, 1)"));
+    assertEquals(Arrays.asList("v1:VECTOR(128, DOUBLE)", "v2:VECTOR(64)"),
+        StringUtils.splitTopLevelCommas("v1:VECTOR(128, DOUBLE), 
v2:VECTOR(64)"));
+    // Segments are trimmed and empty ones dropped.
+    assertEquals(Arrays.asList("a int", "b string"),
+        StringUtils.splitTopLevelCommas(" a int ,, b string "));
+    // Null / empty / blank input yields an empty list.
+    assertEquals(Collections.emptyList(), 
StringUtils.splitTopLevelCommas(null));
+    assertEquals(Collections.emptyList(), StringUtils.splitTopLevelCommas(""));
+  }
+
   @Test
   public void testStringNullToEmpty() {
     String str = "This is a test";
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index e364772e0596..3108b96fdd18 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -27,6 +27,10 @@ import org.apache.hudi.internal.schema.HoodieSchemaException
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.parquet.schema.{GroupType, MessageType, Type}
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
@@ -542,4 +546,158 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
 
     spark.sql(s"drop table $tableName")
   }
+
+  test("Test Shredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+
+    // Test 1: Shredding enabled with forced schema → parquet should have 
typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+        """.stripMargin)
+
+      spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true")
+      spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true")
+      spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = 
a int, b string")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+        """.stripMargin)
+      // Reading shredded variants back needs Spark 4.1+ 
(spark.sql.variant.allowReadingShredded);
+      // Spark 4.0's reader rejects the 3-field shredded layout (4.0 read 
support is added later, see
+      // https://github.com/apache/hudi/issues/18931). The shredded write is 
still validated below.
+      if (HoodieSparkUtils.gteqSpark4_1) {
+        checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+          Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+        )
+      }
+
+      // Verify parquet schema has shredded structure with typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(variantGroup.containsField("typed_value"),
+          s"Shredded variant should have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.OPTIONAL,
+          "Shredded variant value field should be OPTIONAL")
+        val metadataField = 
variantGroup.getType(variantGroup.getFieldIndex("metadata"))
+        assert(metadataField.getRepetition == Type.Repetition.REQUIRED,
+          "Shredded variant metadata field should be REQUIRED")
+      }
+    })
+  }
+
+  test("Test Unshredded Variant Write and Read + Validate Parquet Schema after 
Write") {
+    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+    // Shredding disabled parquet should NOT have typed_value
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  v variant,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+              """.stripMargin)
+
+      spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false")
+
+      spark.sql(
+        s"""
+           |insert into $tableName values
+           |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+
+      // Verify parquet schema does NOT have typed_value
+      val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+      assert(parquetFiles.nonEmpty, "Should have at least one data parquet 
file")
+
+      parquetFiles.foreach { filePath =>
+        val schema = readParquetSchema(filePath)
+        val variantGroup = getFieldAsGroup(schema, "v")
+        assert(!variantGroup.containsField("typed_value"),
+          s"Non-shredded variant should NOT have typed_value field. 
Schema:\n$variantGroup")
+        val valueField = 
variantGroup.getType(variantGroup.getFieldIndex("value"))
+        assert(valueField.getRepetition == Type.Repetition.REQUIRED,
+          "Non-shredded variant value field should be REQUIRED")
+      }
+
+      // Verify data can still be read back for the non-shredded case
+      checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
+        Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+      )
+    })
+  }
+
+  /**
+   * Lists data parquet files in the table directory, excluding Hudi metadata 
files.
+   */
+  private def listDataParquetFiles(tablePath: String): Seq[String] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf)
+    val iter = fs.listFiles(new HadoopPath(tablePath), true)
+    val files = scala.collection.mutable.ArrayBuffer[String]()
+    while (iter.hasNext) {
+      val file = iter.next()
+      val path = file.getPath.toString
+      if (path.endsWith(".parquet") && !path.contains(".hoodie")) {
+        files += path
+      }
+    }
+    files.toSeq
+  }
+
+  /**
+   * Reads the Parquet schema (MessageType) from a parquet file.
+   */
+  private def readParquetSchema(filePath: String): MessageType = {
+    val conf = spark.sparkContext.hadoopConfiguration
+    val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf)
+    val reader = ParquetFileReader.open(inputFile)
+    try {
+      reader.getFooter.getFileMetaData.getSchema
+    } finally {
+      reader.close()
+    }
+  }
+
+  /**
+   * Gets a named field from a GroupType (MessageType) and returns it as a 
GroupType.
+   * Uses getFieldIndex(String) + getType(int) to avoid Scala overload 
resolution issues.
+   */
+  private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType 
= {
+    val idx: Int = parent.getFieldIndex(fieldName)
+    parent.getType(idx).asGroupType()
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
new file mode 100644
index 000000000000..a0aa663f9463
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantSchema;
+import org.apache.spark.types.variant.VariantShreddingWriter;
+import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
+import 
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Implementation of {@link VariantShreddingProvider} using Spark 4's variant 
parsing library.
+ *
+ * <p>This class bridges the Avro record path and Spark's {@link 
VariantShreddingWriter}
+ * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It 
converts
+ * the shredded output into Avro {@link GenericRecord}s that can be written via
+ * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p>
+ *
+ * <p>The shredding logic is delegated to {@link 
VariantShreddingWriter#castShredded},
+ * which handles scalar, object, and array shredding including residual value 
construction
+ * for non-matching fields. This class implements the {@link ShreddedResult} 
and
+ * {@link ShreddedResultBuilder} interfaces to collect the shredded components 
into
+ * Avro GenericRecords.</p>
+ */
+public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider {
+
+  @Override
+  public GenericRecord shredVariantRecord(
+      GenericRecord unshreddedVariant,
+      Schema shreddedSchema,
+      HoodieSchema.Variant variantSchema) {
+
+    ByteBuffer valueBuf = (ByteBuffer) 
unshreddedVariant.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+    ByteBuffer metadataBuf = (ByteBuffer) 
unshreddedVariant.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+
+    if (valueBuf == null || metadataBuf == null) {
+      return null;
+    }
+
+    byte[] valueBytes = toByteArray(valueBuf);
+    byte[] metadataBytes = toByteArray(metadataBuf);
+
+    Variant variant = new Variant(valueBytes, metadataBytes);
+
+    // Build VariantSchema from the Avro shredded schema, registering
+    // Avro schemas at each level for GenericRecord construction.
+    AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder();
+    VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, 
builder);
+
+    // Delegate to Spark's VariantShreddingWriter for the actual shredding 
logic.
+    AvroShreddedResult result = (AvroShreddedResult)
+        VariantShreddingWriter.castShredded(variant, sparkSchema, builder);
+
+    return result.toGenericRecord();
+  }
+
+  /**
+   * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a
+   * shredded variant structure ({@code value}, {@code metadata}, {@code 
typed_value}).
+   *
+   * <p>This method also registers the Avro schema mapping in the builder so 
that
+   * {@link AvroShreddedResultBuilder#createEmpty} can create results with the
+   * correct Avro schema at each nesting level.</p>
+   */
+  private VariantSchema buildVariantSchema(Schema avroSchema, boolean 
isTopLevel,
+                                           AvroShreddedResultBuilder builder) {
+    Schema.Field valueField = 
avroSchema.getField(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+    Schema.Field metadataField = 
avroSchema.getField(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+    Schema.Field typedValueField = 
avroSchema.getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+
+    int fieldCount = 0;
+    int variantIdx = valueField != null ? fieldCount++ : -1;
+    int topLevelMetadataIdx;
+    if (metadataField != null && isTopLevel) {
+      topLevelMetadataIdx = fieldCount++;
+    } else {
+      topLevelMetadataIdx = -1;
+      if (metadataField != null) {
+        fieldCount++;
+      }
+    }
+    int typedIdx = typedValueField != null ? fieldCount++ : -1;
+    int numFields = fieldCount;
+
+    VariantSchema.ScalarType scalarSchema = null;
+    VariantSchema.ObjectField[] objectSchema = null;
+    VariantSchema arraySchema = null;
+
+    if (typedValueField != null) {
+      Schema tvSchema = unwrapNullable(typedValueField.schema());
+
+      switch (tvSchema.getType()) {
+        case RECORD:
+          // Object shredding: each field has a nested {value, typed_value} 
sub-struct
+          List<VariantSchema.ObjectField> fields = new ArrayList<>();
+          for (Schema.Field field : tvSchema.getFields()) {
+            Schema fieldSchema = unwrapNullable(field.schema());
+            VariantSchema subSchema = buildVariantSchema(fieldSchema, false, 
builder);
+            fields.add(new VariantSchema.ObjectField(field.name(), subSchema));
+          }
+          objectSchema = fields.toArray(new VariantSchema.ObjectField[0]);
+          break;
+
+        case ARRAY:
+          // Array shredding: elements follow the shredding schema
+          Schema elementSchema = unwrapNullable(tvSchema.getElementType());
+          arraySchema = buildVariantSchema(elementSchema, false, builder);
+          break;
+
+        default:
+          // Scalar shredding
+          scalarSchema = avroTypeToScalarType(tvSchema);
+          break;
+      }
+    }
+
+    VariantSchema result = new VariantSchema(
+        typedIdx, variantIdx, topLevelMetadataIdx, numFields,
+        scalarSchema, objectSchema, arraySchema);
+
+    builder.registerSchema(result, avroSchema);
+
+    return result;
+  }
+
+  /**
+   * Maps an Avro {@link Schema} type (potentially with logical type 
annotations)
+   * to a {@link VariantSchema.ScalarType}.
+   */
+  private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) {
+    LogicalType logicalType = schema.getLogicalType();
+
+    // Check logical types first
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+        return new VariantSchema.DecimalType(decimal.getPrecision(), 
decimal.getScale());
+      }
+      String name = logicalType.getName();
+      if ("date".equals(name)) {
+        return new VariantSchema.DateType();
+      }
+      if ("timestamp-micros".equals(name)) {
+        return new VariantSchema.TimestampType();
+      }
+      if ("local-timestamp-micros".equals(name)) {
+        return new VariantSchema.TimestampNTZType();
+      }
+      if ("timestamp-millis".equals(name)) {
+        return new VariantSchema.TimestampType();
+      }
+      if ("local-timestamp-millis".equals(name)) {
+        return new VariantSchema.TimestampNTZType();
+      }
+      if ("uuid".equals(name)) {
+        return new VariantSchema.UuidType();
+      }
+    }
+
+    switch (schema.getType()) {
+      case BOOLEAN:
+        return new VariantSchema.BooleanType();
+      case INT:
+        return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT);
+      case LONG:
+        return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG);
+      case FLOAT:
+        return new VariantSchema.FloatType();
+      case DOUBLE:
+        return new VariantSchema.DoubleType();
+      case STRING:
+        return new VariantSchema.StringType();
+      case BYTES:
+        return new VariantSchema.BinaryType();
+      case FIXED:
+        return new VariantSchema.BinaryType();
+      default:
+        return null;
+    }
+  }
+
+  private static Schema unwrapNullable(Schema schema) {
+    if (schema.getType() == Schema.Type.UNION) {
+      for (Schema type : schema.getTypes()) {
+        if (type.getType() != Schema.Type.NULL) {
+          return type;
+        }
+      }
+    }
+    return schema;
+  }
+
+  private static byte[] toByteArray(ByteBuffer buffer) {
+    if (buffer.hasArray() && buffer.position() == 0
+        && buffer.arrayOffset() == 0
+        && buffer.remaining() == buffer.array().length) {
+      return buffer.array();
+    }
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.duplicate().get(bytes);
+    return bytes;
+  }
+
+  /**
+   * {@link ShreddedResult} implementation that collects shredded variant 
components
+   * and converts them into an Avro {@link GenericRecord}.
+   */
+  static class AvroShreddedResult implements ShreddedResult {
+    private final VariantSchema variantSchema;
+    private final Schema avroSchema;
+
+    private byte[] metadata;
+    private byte[] variantValue;
+    private Object scalarValue;
+    private AvroShreddedResult[] objectFields;
+    private AvroShreddedResult[] arrayElements;
+
+    AvroShreddedResult(VariantSchema variantSchema, Schema avroSchema) {
+      this.variantSchema = variantSchema;
+      this.avroSchema = avroSchema;
+    }
+
+    @Override
+    public void addArray(ShreddedResult[] array) {
+      this.arrayElements = new AvroShreddedResult[array.length];
+      for (int i = 0; i < array.length; i++) {
+        this.arrayElements[i] = (AvroShreddedResult) array[i];
+      }
+    }
+
+    @Override
+    public void addObject(ShreddedResult[] values) {
+      this.objectFields = new AvroShreddedResult[values.length];
+      for (int i = 0; i < values.length; i++) {
+        this.objectFields[i] = (AvroShreddedResult) values[i];
+      }
+    }
+
+    @Override
+    public void addVariantValue(byte[] result) {
+      this.variantValue = result;
+    }
+
+    @Override
+    public void addScalar(Object result) {
+      this.scalarValue = result;
+    }
+
+    @Override
+    public void addMetadata(byte[] result) {
+      this.metadata = result;
+    }
+
+    /**
+     * Converts the collected shredded components into an Avro {@link 
GenericRecord}.
+     */
+    GenericRecord toGenericRecord() {
+      GenericRecord record = new GenericData.Record(avroSchema);
+
+      // Metadata (only present at top level)
+      if (metadata != null) {
+        record.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(metadata));
+      }
+
+      // Value (variant binary for non-shredded or residual data)
+      Schema.Field valueField = 
avroSchema.getField(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+      if (valueField != null) {
+        if (variantValue != null) {
+          record.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
ByteBuffer.wrap(variantValue));
+        } else {
+          record.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, null);
+        }
+      }
+
+      // Typed value
+      Schema.Field tvField = 
avroSchema.getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+      if (tvField == null) {
+        return record;
+      }
+
+      if (scalarValue != null) {
+        Schema tvSchema = unwrapNullable(tvField.schema());
+        record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, 
convertScalarToAvro(scalarValue, tvSchema));
+      } else if (objectFields != null) {
+        Schema tvSchema = unwrapNullable(tvField.schema());
+        GenericRecord tvRecord = new GenericData.Record(tvSchema);
+        for (int i = 0; i < objectFields.length; i++) {
+          String fieldName = variantSchema.objectSchema[i].fieldName;
+          // Always create the sub-record even for missing fields (non-null 
struct with null fields)
+          tvRecord.put(fieldName, objectFields[i].toGenericRecord());
+        }
+        record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, tvRecord);
+      } else if (arrayElements != null) {
+        List<GenericRecord> list = new ArrayList<>(arrayElements.length);
+        for (AvroShreddedResult element : arrayElements) {
+          list.add(element.toGenericRecord());
+        }
+        record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, list);
+      } else {
+        record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, null);
+      }
+
+      return record;
+    }
+
+    /**
+     * Converts a scalar value from Spark's variant representation to an 
Avro-compatible type.
+     * Handles type widening (Byte/Short to Int/Long) and binary wrapping.
+     */
+    private static Object convertScalarToAvro(Object value, Schema avroSchema) 
{
+      if (value instanceof byte[]) {
+        return ByteBuffer.wrap((byte[]) value);
+      }
+      if (value instanceof UUID) {
+        return value.toString();
+      }
+      // Widen integer types to match Avro schema expectations
+      if (avroSchema.getType() == Schema.Type.INT) {
+        if (value instanceof Byte) {
+          return ((Byte) value).intValue();
+        }
+        if (value instanceof Short) {
+          return ((Short) value).intValue();
+        }
+      }
+      if (avroSchema.getType() == Schema.Type.LONG) {
+        if (value instanceof Byte) {
+          return ((Byte) value).longValue();
+        }
+        if (value instanceof Short) {
+          return ((Short) value).longValue();
+        }
+        if (value instanceof Integer) {
+          return ((Integer) value).longValue();
+        }
+      }
+      // BigDecimal, Boolean, String, Integer, Long, Float, Double
+      // are directly compatible with Avro's type system
+      return value;
+    }
+  }
+
+  /**
+   * {@link ShreddedResultBuilder} that creates {@link AvroShreddedResult} 
instances
+   * with the corresponding Avro schema at each nesting level.
+   */
+  static class AvroShreddedResultBuilder implements ShreddedResultBuilder {
+    private final Map<VariantSchema, Schema> schemaMap = new 
IdentityHashMap<>();
+
+    void registerSchema(VariantSchema variantSchema, Schema avroSchema) {
+      schemaMap.put(variantSchema, avroSchema);
+    }
+
+    @Override
+    public ShreddedResult createEmpty(VariantSchema schema) {
+      Schema avroSchema = schemaMap.get(schema);
+      if (avroSchema == null) {
+        throw new IllegalStateException(
+            "No Avro schema registered for VariantSchema: " + schema);
+      }
+      return new AvroShreddedResult(schema, avroSchema);
+    }
+
+    @Override
+    public boolean allowNumericScaleChanges() {
+      return true;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index ea6e96943a69..a679c7e62bab 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -180,18 +180,20 @@ abstract class BaseSpark4Adapter extends SparkAdapter 
with Logging {
   override def isDataTypeEqualForPhysicalSchema(requiredType: DataType, 
fileType: DataType): Option[Boolean] = {
     /**
      * Checks if a StructType is the physical representation of VariantType in 
Parquet.
-     * VariantType is stored in Parquet as a struct with two binary fields: 
"metadata" and "value".
+     * VariantType is stored in Parquet as a struct with binary fields: 
"metadata" and "value".
+     * Supports both unshredded (2 fields) and shredded (3 fields with 
"typed_value") layouts.
      */
+    // TODO(voon) parquet-1.16: replace this name/arity shape heuristic with a 
VariantLogicalTypeAnnotation check once all supported parquet versions are >= 
1.16.
     def isVariantPhysicalSchema(structType: StructType): Boolean = {
-      if (structType.fields.length != 2) {
-        false
-      } else {
-        val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
-        fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
-          fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
-          fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
-          fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
-      }
+      val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+      val hasRequiredFields = 
fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
+        fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
+        fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
+        fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
+      val isUnshredded = structType.fields.length == 2
+      val isShredded = structType.fields.length == 3 &&
+        fieldMap.contains(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)
+      hasRequiredFields && (isUnshredded || isShredded)
     }
 
     // Handle VariantType comparisons
@@ -261,6 +263,7 @@ abstract class BaseSpark4Adapter extends SparkAdapter with 
Logging {
     applyVariantLogicalType(builder).named(fieldName)
   }
 
+  // TODO(#18935) drop-spark4.0: when all remaining 4.x adapters are parquet 
1.16+, apply variantType() in this base and delete the no-op default plus the 
Spark4_1Adapter override.
   protected def applyVariantLogicalType(builder: 
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = builder
 
   override def isVariantShreddingStruct(structType: StructType): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
index 02a39037822b..3eb1c39dc109 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
@@ -31,7 +31,7 @@ import java.time.ZoneId
 
 import scala.collection.JavaConverters._
 
-// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark 
4.1+ reads
+// TODO(#18935): Delete this file when the hudi-spark4.0.x module is removed. 
Spark 4.1+ reads
 //  variant fields by name via SPARK-54410, so the reorder workaround below is 
no longer
 //  needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its 
converters
 //  array in hardcoded [value, metadata] order, then indexes by schema 
position. If the

Reply via email to