This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3648230469f5 feat(variant): support reading shredded variant base 
files via the AVRO reader (#18938)
3648230469f5 is described below

commit 3648230469f5e25663374d7bcc2dfd12b9100e31
Author: voonhous <[email protected]>
AuthorDate: Thu Jun 25 02:41:02 2026 +0800

    feat(variant): support reading shredded variant base files via the AVRO 
reader (#18938)
    
    * feat(variant): reconstruct unshredded variant on the AVRO read path 
(#18931)
    
    Compaction/clustering reading an already-shredded base file via the AVRO 
record
    path now rebuilds the unshredded {metadata, value} variant before records 
reach
    the merger/writer, replacing the prior fail-fast guard.
    
    - VariantShreddingProvider: add rebuildVariantRecord (inverse of 
shredVariantRecord);
      Spark4VariantShreddingProvider implements it via Spark's 
ShreddingUtils.rebuild over
      Avro-backed ShreddedRow rows.
    - HoodieAvroParquetReader: detect shredded variant columns, read them at 
the file's
      shredded schema, and reconstruct to unshredded per record 
(VariantReconstruction);
      provider loaded via config or classpath, gated on
      hoodie.parquet.variant.allow.reading.shredded.
    - Extract stripVariantShredding into VariantSchemaUtils (shared by 
reader/writer).
    - Remove the read-then-reshred guard from HoodieAvroWriteSupport and its 
unit test.
    - Extend the MOR compaction test in TestVariantDataType to write shredded 
and read
      back (AVRO reconstruction + SPARK native via withRecordType).
    
    * test(variant): gate shredded MOR compaction test on Spark >= 4.1
    
    The compaction base file is written by the AVRO shredding writer as
    [metadata, value, typed_value]. Spark 4.0's reorderVariantFields rebuilds
    that group as [value, metadata] and drops typed_value, so the native read
    after compaction fails with MALFORMED_VARIANT. Spark 4.1+ reads variant
    fields by name (SPARK-54410) and reconstructs correctly.
    
    Fixes the spark4.0 leg of #18931.
    
    * review(variant): drop no-op makeNullable in applyForcedShreddingSchema
    
    withSchema(replacement) rebuilds the field from replacement, which already
    mirrors the original nullability, so the intermediate makeNullable() was a
    no-op (and could reset field order for non-nullable fields).
    
    * review(variant): centralize shredding-provider classpath detection
    
    VariantReconstruction and HoodieAvroFileWriterFactory each hardcoded the
    provider FQN and a Class.forName lookup. Move the candidate list and 
detection
    into a shared VariantShreddingProvider.detectProviderClassOnClasspath(), so 
a
    new provider impl is registered in one place.
    
    * review(variant): rename VariantReconstruction to 
HoodieVariantReconstruction
    
    Every other class in org.apache.hudi.io.storage.hadoop carries the Hoodie
    prefix (HoodieAvroParquetReader, HoodieHadoopIOFactory, ...); this was the 
lone
    exception. Package-private with a single caller, so the rename is contained.
    
    * review(variant): rename nonNull to unwrapNullable in 
HoodieVariantReconstruction
    
    The helper unwraps a nullable union type, not a null-assertion guard; the 
new
    name reflects what it does at each call site.
    
    * fix(variant): fail fast when a shredded base file has no reconstruction 
provider
    
    When a requested column is shredded in the base file and reading shredded
    variants is enabled but no provider is loadable, create() returned null and 
the
    reader fell back to the unshredded requested schema. parquet-avro then reads
    value/metadata and drops typed_value, silently corrupting variants whose 
payload
    lived in typed_value. Throw HoodieException instead, mirroring the write 
path.
    Removes the now-unused logger.
    
    * fix(variant): fail fast on missing metadata in rebuildVariantRecord
    
    metadata is REQUIRED in the shredded parquet schema, so a null metadata on 
the
    read path means a malformed base file. Returning null let the caller null 
out the
    whole variant column, silently dropping data; throw HoodieException 
instead. The
    separate null-record guard stays (genuine null variant passes through).
    
    * test(variant): correct stale #18931 reference in shredded read-back 
comment
    
    The Spark SQL read-back is gated on Spark 4.1+ because Spark 4.0's native 
parquet
    reader rejects the 3-field shredded layout (SPARK-54410) - not because of 
#18931.
    #18931 is the AVRO reader reconstruction and does not affect the Spark 
native read,
    so drop the misattributed pointer.
    
    * review(variant): document why AvroVariantRow scalar getters read 
typed_value directly
    
    The scalar getters ignore ordinal and read typed_value directly while
    isNullAt/getBinary go through fieldNameFor(ordinal). Spark only calls the
    scalar getters for the scalar typed_value, so the asymmetry is intentional;
    add a comment so it does not read as an oversight.
    
    * fix(variant): decline millisecond-precision timestamps in shredding type 
mapping
    
    avroTypeToScalarType mapped timestamp-millis / local-timestamp-millis to the
    micros-based TimestampType / TimestampNTZType. The Variant binary spec 
stores
    timestamps in microseconds, so a millis-precision typed_value would be read 
back
    as micros and scaled 1000x. Unreachable today (Hudi/Spark only produce 
micros
    typed_value), but decline to shred millis as a scalar so it can never 
silently
    corrupt; the value stays in the residual unshredded binary.
    
    * fix(variant): fail fast on disabled shredded read instead of dropping 
typed_value
    
    HoodieVariantReconstruction.create returned null on 
allow.reading.shredded=false
    before detecting whether the file actually has shredded variant columns. A
    shredded base file was then read at the unshredded requested schema, 
dropping
    typed_value -- rows whose payload lived there came back with a null value
    (silent corruption), which compaction/clustering could then persist.
    
    Move the flag check below the shredded-column detection: return null when no
    shredded variant column is present (nothing to reconstruct, regardless of 
the
    flag), and throw when shredded columns are present but reading is disabled -
    mirroring the no-provider branch and Spark's allowReadingShredded=false, 
which
    rejects shredded reads rather than discarding data. Adds a regression test.
    
    * test(variant): cover the Avro shredded-read reconstruction directly
    
    The Spark MOR SQL test ("...triggers compaction") does not exercise the AVRO
    reconstruction it was meant to cover: withRecordType only swaps the record
    merger and log-block format, and Spark compaction reads base files via
    SparkFileFormatInternalRowReaderContext (InternalRow), so 
HoodieAvroParquetReader
    - where HoodieVariantReconstruction / rebuildVariantRecord run - is never 
reached.
    
    Add TestSpark4VariantShreddingProvider: a direct shred -> rebuild 
round-trip over
    the provider, covering scalar (numeric/string/boolean/decimal), object, and 
array
    variant shapes. This exercises rebuildVariantRecord and the AvroVariantRow/
    AvroObjectRow/AvroArrayRow accessors that the SQL test cannot reach. Also 
correct
    the misleading comment on the SQL test to state it covers the Spark 
InternalRow
    (native) read of a shredded base file, not the Avro path.
    
    * fix(variant): copy non-target fields when building the reconstruction 
intermediate schema
    
    HoodieVariantReconstruction.create reused the requested field instances for
    non-target (non-variant) columns when assembling the intermediate read 
schema.
    Those Avro Fields are already bound to the requested record, so 
Schema.setFields
    throws "Field already used" as soon as the requested record has any 
non-variant
    column alongside a shredded variant - i.e. every real table. The target 
branch
    avoided this only because withSchema() builds a fresh Field; do the same for
    non-target fields.
    
    This was latent because nothing exercised the successful create -> 
reconstruct
    path: Spark compaction reads base files via the InternalRow reader, and the
    Java/Flink AVRO path fails fast first (no provider). Add
    TestHoodieVariantReconstructionRoundTrip (a create -> reconstruct 
round-trip with
    the real provider, asserting variant rebuild + non-variant pass-through and 
field
    alignment), which is what surfaced the bug.
    
    * docs(variant): correct rebuildVariantRecord javadoc to match the impl
    
    The @return claimed null is returned when metadata is missing, but the 
Spark4
    impl returns null only when shreddedVariant is null and throws 
HoodieException
    on missing metadata. Fix the @return and add a matching @throws.
---
 .../org/apache/hudi/avro/VariantSchemaUtils.java   |  85 ++++++
 .../apache/hudi/avro/VariantShreddingProvider.java |  45 ++++
 .../apache/hudi/avro/HoodieAvroWriteSupport.java   |  93 +------
 .../hadoop/HoodieAvroFileWriterFactory.java        |  21 +-
 .../io/storage/hadoop/HoodieAvroParquetReader.java |  21 +-
 .../hadoop/HoodieVariantReconstruction.java        | 194 ++++++++++++++
 .../io/hadoop/TestHoodieBaseParquetWriter.java     |  51 ----
 .../hadoop/TestHoodieVariantReconstruction.java    |  75 ++++++
 .../sql/hudi/dml/schema/TestVariantDataType.scala  |  22 +-
 .../variant/Spark4VariantShreddingProvider.java    | 287 ++++++++++++++++++++-
 .../TestHoodieVariantReconstructionRoundTrip.java  | 113 ++++++++
 .../TestSpark4VariantShreddingProvider.java        | 118 +++++++++
 12 files changed, 949 insertions(+), 176 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java
new file mode 100644
index 000000000000..aec64fa0cf3d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared helpers for converting between shredded and unshredded variant 
schemas.
+ * Used by both the write path ({@link HoodieAvroWriteSupport}) and the read 
path
+ * (variant reconstruction in the parquet reader).
+ */
+public class VariantSchemaUtils {
+
+  private VariantSchemaUtils() {
+  }
+
+  /**
+   * Strips shredding from top-level variant fields in {@code schema}, 
replacing each shredded
+   * variant with its unshredded form (dropping {@code typed_value}). 
Non-variant fields and
+   * already-unshredded variants pass through unchanged; returns {@code 
schema} as-is when nothing
+   * changes.
+   */
+  public static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+    if (schema.getType() != HoodieSchemaType.RECORD) {
+      return schema;
+    }
+
+    List<HoodieSchemaField> fields = schema.getFields();
+    List<HoodieSchemaField> newFields = new ArrayList<>();
+    boolean changed = false;
+
+    for (HoodieSchemaField field : fields) {
+      HoodieSchema fieldSchema = field.schema();
+      boolean wasNullable = fieldSchema.isNullable();
+      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
+
+      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+        HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+        if (variant.isShredded()) {
+          HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+              unwrapped.getAvroSchema().getName(),
+              unwrapped.getAvroSchema().getNamespace(),
+              unwrapped.getAvroSchema().getDoc());
+          HoodieSchema replacement = wasNullable ? 
HoodieSchema.createNullable(unshredded) : unshredded;
+          newFields.add(field.withSchema(replacement));
+          changed = true;
+          continue;
+        }
+      }
+      newFields.add(field);
+    }
+
+    if (!changed) {
+      return schema;
+    }
+
+    return HoodieSchema.createRecord(
+        schema.getAvroSchema().getName(),
+        schema.getAvroSchema().getNamespace(),
+        schema.getAvroSchema().getDoc(),
+        newFields);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
index 90ffbee1e53f..910554c50a40 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
@@ -37,6 +37,29 @@ import org.apache.avro.generic.GenericRecord;
  */
 public interface VariantShreddingProvider {
 
+  /**
+   * Provider implementations to auto-detect on the classpath, in priority 
order, when the variant
+   * shredding provider class is not set explicitly via config. Sole provider 
today: variant
+   * shredding currently requires Spark 4.0+.
+   */
+  String[] CLASSPATH_CANDIDATES = 
{"org.apache.hudi.variant.Spark4VariantShreddingProvider"};
+
+  /**
+   * Returns the fully-qualified class name of the first {@link 
VariantShreddingProvider}
+   * implementation available on the classpath, or {@code null} if none is 
present.
+   */
+  static String detectProviderClassOnClasspath() {
+    for (String candidate : CLASSPATH_CANDIDATES) {
+      try {
+        Class.forName(candidate);
+        return candidate;
+      } catch (ClassNotFoundException | NoClassDefFoundError e) {
+        // Provider not on classpath; try the next candidate.
+      }
+    }
+    return null;
+  }
+
   /**
    * Transform an unshredded variant GenericRecord into a shredded one.
    * <p>
@@ -63,4 +86,26 @@ public interface VariantShreddingProvider {
       GenericRecord unshreddedVariant,
       Schema shreddedSchema,
       HoodieSchema.Variant variantSchema);
+
+  /**
+   * Reconstruct an unshredded variant GenericRecord from a shredded one (the 
inverse of
+   * {@link #shredVariantRecord}).
+   * <p>
+   * Used on the read path: records read from an already-shredded base file 
(compaction/clustering)
+   * arrive with {@code typed_value} populated. This rebuilds the full variant 
binary so the record
+   * presents the standard unshredded {@code {metadata, value}} shape before 
it reaches the
+   * merger/writer.
+   *
+   * @param shreddedVariant  GenericRecord with {value, metadata, typed_value} 
read from a shredded base file
+   * @param shreddedSchema   the Avro schema of {@code shreddedVariant} 
(carries typed_value)
+   * @param unshreddedSchema target Avro schema with {value: ByteBuffer, 
metadata: ByteBuffer}
+   * @return a GenericRecord conforming to {@code unshreddedSchema} with the 
full reconstructed
+   *         variant binary in {@code value}, or {@code null} when {@code 
shreddedVariant} is null
+   * @throws org.apache.hudi.exception.HoodieException if {@code 
shreddedVariant} is missing its
+   *         required {@code metadata} field (a malformed shredded base file)
+   */
+  GenericRecord rebuildVariantRecord(
+      GenericRecord shreddedVariant,
+      Schema shreddedSchema,
+      Schema unshreddedSchema);
 }
\ No newline at end of file
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 3cd3c965388b..497aa3008406 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -88,13 +88,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
    */
   private final VariantShreddingProvider shreddingProvider;
 
-  /**
-   * Names of all variant-typed top-level fields, regardless of shredding. 
Used to fail fast on the
-   * not-yet-supported read-then-reshred path (compaction/clustering over an 
already-shredded base
-   * file). See https://github.com/apache/hudi/issues/18931.
-   */
-  private final String[] variantFieldNames;
-
   public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema, 
Option<BloomFilter> bloomFilterOpt,
                                 Properties properties) {
     this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema, 
properties), bloomFilterOpt, properties);
@@ -115,10 +108,8 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
         properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
             
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
 
-    // Single pass over the effective schema fields: collect every 
variant-typed field name (for the
-    // read-then-reshred guard) and, when shredding is enabled, the subset 
that needs shredding.
+    // When shredding is enabled, collect the variant fields that need 
shredding.
     Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
-    List<String> variantNames = new ArrayList<>();
 
     if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
       List<HoodieSchemaField> fields = effectiveSchema.getFields();
@@ -132,7 +123,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
         if (fieldSchema.getType() != HoodieSchemaType.VARIANT) {
           continue;
         }
-        variantNames.add(field.name());
         if (variantWriteShreddingEnabled) {
           HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
           if (variant.isShredded() && 
variant.getTypedValueField().isPresent()) {
@@ -149,7 +139,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     }
 
     this.shreddedVariantFields = shreddedFields;
-    this.variantFieldNames = variantNames.toArray(new String[0]);
 
     // Load shredding provider via reflection if needed
     if (!shreddedVariantFields.isEmpty()) {
@@ -194,7 +183,7 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
       // Schemas from clustering/compaction may still be shredded (read from 
on-disk Parquet files
       // written with shredding enabled), so we need to strip typed_value when 
shredding
       // is disabled.
-      return stripVariantShredding(hoodieSchema);
+      return VariantSchemaUtils.stripVariantShredding(hoodieSchema);
     }
 
     // Check if a forced shredding schema is configured
@@ -219,9 +208,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
   @SuppressWarnings("unchecked")
   @Override
   public void write(T record) {
-    if (variantFieldNames.length > 0) {
-      assertInputNotAlreadyShredded((IndexedRecord) record);
-    }
     if (!shreddedVariantFields.isEmpty() && shreddingProvider != null) {
       super.write((T) shredRecord((IndexedRecord) record));
     } else {
@@ -273,32 +259,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     return shreddedRecord;
   }
 
-  /**
-   * Fails fast on the not-yet-supported read-then-reshred path. Records read 
from an already-shredded
-   * base file (compaction/clustering) arrive with a populated {@code 
typed_value} and a possibly-null
-   * {@code value}. The writer has no logic to reconstruct the unshredded 
variant, so shredding would
-   * silently drop the payload (shredding enabled) and the parquet writer 
would reject the null at the
-   * REQUIRED {@code value} field (shredding disabled). Reconstruction is 
tracked in
-   * https://github.com/apache/hudi/issues/18931.
-   */
-  private void assertInputNotAlreadyShredded(IndexedRecord inputRecord) {
-    Schema inputSchema = inputRecord.getSchema();
-    for (String fieldName : variantFieldNames) {
-      Schema.Field field = inputSchema.getField(fieldName);
-      if (field == null) {
-        continue;
-      }
-      Object value = inputRecord.get(field.pos());
-      if (value instanceof GenericRecord
-          && ((GenericRecord) 
value).getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD) != 
null) {
-        throw new HoodieException("Writing an already-shredded variant field 
'" + fieldName
-            + "' is not supported yet. Compaction/clustering read a base file 
written with variant "
-            + "shredding and re-wrote it through the Avro path; the reader 
does not yet reconstruct "
-            + "the unshredded variant. Tracked in 
https://github.com/apache/hudi/issues/18931.";);
-      }
-    }
-  }
-
   @Override
   public WriteSupport.FinalizedWriteContext finalizeWrite() {
     Map<String, String> extraMetadata =
@@ -365,7 +325,8 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
             shreddedFields);
         HoodieSchema replacement = wasNullable
             ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
-        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+        // replacement already mirrors the field's original nullability, so 
withSchema alone suffices.
+        
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.withSchema(replacement)));
         changed = true;
       } else {
         newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
@@ -434,52 +395,6 @@ public class HoodieAvroWriteSupport<T> extends 
AvroWriteSupport<T> {
     }
   }
 
-  /**
-   * Strips shredding from variant fields in the schema.
-   * Replaces shredded variant fields with unshredded variants (removing 
typed_value).
-   */
-  private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
-    if (schema.getType() != HoodieSchemaType.RECORD) {
-      return schema;
-    }
-
-    List<HoodieSchemaField> fields = schema.getFields();
-    List<HoodieSchemaField> newFields = new ArrayList<>();
-    boolean changed = false;
-
-    for (HoodieSchemaField field : fields) {
-      HoodieSchema fieldSchema = field.schema();
-      boolean wasNullable = fieldSchema.isNullable();
-      HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() : 
fieldSchema;
-
-      if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
-        HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
-        if (variant.isShredded()) {
-          // Replace with unshredded variant
-          HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
-              unwrapped.getAvroSchema().getName(),
-              unwrapped.getAvroSchema().getNamespace(),
-              unwrapped.getAvroSchema().getDoc());
-          HoodieSchema replacement = wasNullable ? 
HoodieSchema.createNullable(unshredded) : unshredded;
-          newFields.add(field.withSchema(replacement));
-          changed = true;
-          continue;
-        }
-      }
-      newFields.add(field);
-    }
-
-    if (!changed) {
-      return schema;
-    }
-
-    return HoodieSchema.createRecord(
-        schema.getAvroSchema().getName(),
-        schema.getAvroSchema().getNamespace(),
-        schema.getAvroSchema().getDoc(),
-        newFields);
-  }
-
   /**
    * Extracts the non-null type from a union schema.
    */
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index 4d2630d2e7aa..b5b94ef411b7 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io.storage.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.VariantShreddingProvider;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieParquetConfig;
@@ -60,11 +61,6 @@ import static 
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSc
 
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
 
-  // Sole provider today: variant shredding currently requires Spark 4.0+. If 
more providers are
-  // added, restore a candidate list here and try each in turn.
-  private static final String SPARK4_VARIANT_SHREDDING_PROVIDER =
-      "org.apache.hudi.variant.Spark4VariantShreddingProvider";
-
   public HoodieAvroFileWriterFactory(HoodieStorage storage) {
     super(storage);
   }
@@ -152,7 +148,7 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
     Properties props = TypedProperties.copy(config.getProps());
     // Auto-detect variant shredding provider from classpath if not explicitly 
configured
     if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
-      String detectedClass = detectShreddingProviderClass();
+      String detectedClass = 
VariantShreddingProvider.detectProviderClassOnClasspath();
       if (detectedClass != null) {
         props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), 
detectedClass);
       }
@@ -167,17 +163,4 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
         getAvroSchemaConverter((Configuration) 
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema, 
filter, props);
   }
 
-  /**
-   * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} 
implementation
-   * available on the classpath. Returns the fully-qualified class name if 
found, or null.
-   */
-  private static String detectShreddingProviderClass() {
-    try {
-      Class.forName(SPARK4_VARIANT_SHREDDING_PROVIDER);
-      return SPARK4_VARIANT_SHREDDING_PROVIDER;
-    } catch (ClassNotFoundException e) {
-      // not on classpath
-      return null;
-    }
-  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
index e03eec3fed17..5c8cfa9922be 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
@@ -185,19 +185,23 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
     Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
-    HoodieSchema repairedFileSchema = 
HoodieSchemaRepair.repairLogicalTypes(getSchema(), schema);
+    // If the on-disk file shreds variant columns, read them in their shredded 
(typed_value-bearing)
+    // shape and reconstruct the unshredded variant per-record below (#18931). 
null when not needed.
+    HoodieVariantReconstruction reconstruction = 
HoodieVariantReconstruction.create(getSchema(), schema, storage);
+    HoodieSchema readSchema = reconstruction == null ? schema : 
reconstruction.intermediateSchema();
+    HoodieSchema repairedFileSchema = 
HoodieSchemaRepair.repairLogicalTypes(getSchema(), readSchema);
     Option<HoodieSchema> promotedSchema = Option.empty();
-    if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema.toAvroSchema(),
 schema.toAvroSchema())) {
+    if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema.toAvroSchema(),
 readSchema.toAvroSchema())) {
       AvroReadSupport.setAvroReadSchema(hadoopConf, 
repairedFileSchema.toAvroSchema());
       AvroReadSupport.setRequestedProjection(hadoopConf, 
repairedFileSchema.toAvroSchema());
-      promotedSchema = Option.of(schema);
+      promotedSchema = Option.of(readSchema);
     } else {
-      AvroReadSupport.setAvroReadSchema(hadoopConf, schema.toAvroSchema());
-      AvroReadSupport.setRequestedProjection(hadoopConf, 
schema.toAvroSchema());
+      AvroReadSupport.setAvroReadSchema(hadoopConf, readSchema.toAvroSchema());
+      AvroReadSupport.setRequestedProjection(hadoopConf, 
readSchema.toAvroSchema());
     }
     ParquetReader<IndexedRecord> reader =
         new HoodieAvroParquetReaderBuilder<IndexedRecord>(path)
-            
.withTableSchema(getAvroSchemaConverter(hadoopConf).convert(schema))
+            
.withTableSchema(getAvroSchemaConverter(hadoopConf).convert(readSchema))
             .withConf(hadoopConf)
             .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, 
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
             .set(ParquetInputFormat.STRICT_TYPE_CHECKING, 
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
@@ -206,7 +210,10 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
         ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get(), 
renamedColumns)
         : new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
-    return parquetReaderIterator;
+    if (reconstruction == null) {
+      return parquetReaderIterator;
+    }
+    return new CloseableMappingIterator<>(parquetReaderIterator, 
reconstruction::reconstruct);
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
new file mode 100644
index 000000000000..325c233d928c
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file 
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded 
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such 
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code 
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before 
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and 
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class HoodieVariantReconstruction {
+
+  private final HoodieSchema intermediateSchema;
+  private final Schema outputAvroSchema;
+  private final VariantShreddingProvider provider;
+  // Indexed by field position in the (requested == output) record. For target 
fields, the file's
+  // shredded sub-schema and the unshredded target sub-schema for rebuild; 
null for non-targets.
+  private final boolean[] isTarget;
+  private final Schema[] shreddedSubSchemas;
+  private final Schema[] unshreddedSubSchemas;
+
+  private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema 
outputAvroSchema,
+                                VariantShreddingProvider provider, boolean[] 
isTarget,
+                                Schema[] shreddedSubSchemas, Schema[] 
unshreddedSubSchemas) {
+    this.intermediateSchema = intermediateSchema;
+    this.outputAvroSchema = outputAvroSchema;
+    this.provider = provider;
+    this.isTarget = isTarget;
+    this.shreddedSubSchemas = shreddedSubSchemas;
+    this.unshreddedSubSchemas = unshreddedSubSchemas;
+  }
+
+  /**
+   * Schema to read the parquet file with: the requested schema, but with 
shredded variant columns
+   * swapped to their file (typed_value-bearing) form so parquet-avro 
materializes {@code typed_value}.
+   */
+  HoodieSchema intermediateSchema() {
+    return intermediateSchema;
+  }
+
+  /**
+   * Builds a reconstruction for the given file and requested schemas, or 
returns {@code null} when
+   * none is needed (the file has no shredded variant columns). Throws when 
the file has shredded
+   * variant columns to reconstruct but reading shredded variants is disabled, 
or no provider is
+   * available: either way, reading at the unshredded schema would silently 
drop the typed_value payload.
+   */
+  static HoodieVariantReconstruction create(HoodieSchema fileSchema, 
HoodieSchema requestedSchema, HoodieStorage storage) {
+    if (requestedSchema.getType() != HoodieSchemaType.RECORD || 
fileSchema.getType() != HoodieSchemaType.RECORD) {
+      return null;
+    }
+
+    List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+    List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+    boolean[] isTarget = new boolean[requestedFields.size()];
+    boolean anyTarget = false;
+    for (int i = 0; i < requestedFields.size(); i++) {
+      HoodieSchemaField requestedField = requestedFields.get(i);
+      Option<HoodieSchemaField> fileField = 
fileSchema.getField(requestedField.name());
+      if (fileField.isPresent() && 
isShreddedVariant(fileField.get().schema())) {
+        isTarget[i] = true;
+        anyTarget = true;
+        // Read this column in its on-disk shredded shape.
+        
intermediateFields.add(requestedField.withSchema(fileField.get().schema()));
+      } else {
+        // Copy non-target fields too (withSchema makes a fresh Avro Field): 
reusing the requested
+        // field's Avro Field, already bound to the requested record, would 
fail Schema.setFields with
+        // "Field already used" when building the intermediate record below.
+        
intermediateFields.add(requestedField.withSchema(requestedField.schema()));
+      }
+    }
+    if (!anyTarget) {
+      // No shredded variant columns in the file: nothing to reconstruct, 
regardless of the flag.
+      return null;
+    }
+
+    if 
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+        
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+      // Reading at the unshredded schema would drop typed_value and silently 
corrupt variants whose
+      // payload lives there, so fail fast. Mirrors the no-provider branch and 
Spark's
+      // allowReadingShredded=false, which rejects shredded reads rather than 
discarding data.
+      throw new HoodieException("Base file has shredded variant column(s) but 
reading shredded variants is "
+          + "disabled (" + 
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key()
+          + "=false). Enable it to reconstruct them; otherwise the typed_value 
payload would be silently dropped.");
+    }
+
+    VariantShreddingProvider provider = loadProvider(storage);
+    if (provider == null) {
+      // Reading would drop typed_value and silently corrupt variants whose 
payload lives there, so fail fast.
+      throw new HoodieException("Base file has shredded variant column(s) and 
reading shredded variants is "
+          + "enabled, but no VariantShreddingProvider is available to 
reconstruct them. Set "
+          + HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()
+          + " or add a provider implementation (e.g. the Spark variant module) 
to the classpath.");
+    }
+
+    HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+        requestedSchema.getAvroSchema().getName(),
+        requestedSchema.getAvroSchema().getNamespace(),
+        requestedSchema.getAvroSchema().getDoc(),
+        intermediateFields);
+    // Records leave this reader unshredded; output field order matches the 
requested/intermediate order.
+    HoodieSchema outputSchema = 
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+    Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+    Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+    for (int i = 0; i < requestedFields.size(); i++) {
+      if (isTarget[i]) {
+        shreddedSubSchemas[i] = 
unwrapNullable(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();
+        unshreddedSubSchemas[i] = 
unwrapNullable(outputSchema.getFields().get(i).schema()).getAvroSchema();
+      }
+    }
+
+    return new HoodieVariantReconstruction(intermediateSchema, 
outputSchema.toAvroSchema(), provider,
+        isTarget, shreddedSubSchemas, unshreddedSubSchemas);
+  }
+
+  /**
+   * Rebuilds shredded variant columns of {@code in} (read in the intermediate 
shredded shape) into
+   * a record conforming to the unshredded output schema.
+   */
+  IndexedRecord reconstruct(IndexedRecord in) {
+    GenericRecord out = new GenericData.Record(outputAvroSchema);
+    for (int i = 0; i < isTarget.length; i++) {
+      Object value = in.get(i);
+      if (isTarget[i] && value instanceof GenericRecord) {
+        out.put(i, provider.rebuildVariantRecord((GenericRecord) value, 
shreddedSubSchemas[i], unshreddedSubSchemas[i]));
+      } else {
+        // Non-variant column, or a null variant column: pass through 
unchanged.
+        out.put(i, value);
+      }
+    }
+    return out;
+  }
+
+  private static boolean isShreddedVariant(HoodieSchema schema) {
+    HoodieSchema unwrapped = unwrapNullable(schema);
+    return unwrapped.getType() == HoodieSchemaType.VARIANT
+        && ((HoodieSchema.Variant) unwrapped).isShredded();
+  }
+
+  private static HoodieSchema unwrapNullable(HoodieSchema schema) {
+    return schema.isNullable() ? schema.getNonNullType() : schema;
+  }
+
+  private static VariantShreddingProvider loadProvider(HoodieStorage storage) {
+    String providerClass = storage.getConf()
+        
.getString(HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()).orElse(null);
+    if (providerClass == null || providerClass.isEmpty()) {
+      providerClass = 
VariantShreddingProvider.detectProviderClassOnClasspath();
+    }
+    return providerClass == null ? null : (VariantShreddingProvider) 
ReflectionUtils.loadClass(providerClass);
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index 9cbee1b44cbc..e4e86abef939 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -24,22 +24,15 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieParquetConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaField;
-import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -48,17 +41,12 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieBaseParquetWriter {
@@ -130,43 +118,4 @@ public class TestHoodieBaseParquetWriter {
           "The writer stops write new records while the file doesn't reach the 
max file size limit");
     }
   }
-
-  @Test
-  public void testRejectAlreadyShreddedVariantInput() {
-    // A record read from an already-shredded base file 
(compaction/clustering) arrives with a
-    // populated typed_value. The Avro write support cannot reconstruct the 
unshredded variant yet,
-    // so it must fail fast rather than silently drop the payload. Guard 
tracked in
-    // https://github.com/apache/hudi/issues/18931.
-    HoodieSchema recordSchema = HoodieSchema.createRecord("guardTest", null, 
null, Arrays.asList(
-        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
-        HoodieSchemaField.of("v", HoodieSchema.createVariant())));
-
-    // Shredding disabled so no VariantShreddingProvider is required; the 
variant field is still tracked.
-    Properties props = new Properties();
-    
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
 "false");
-
-    HoodieSchema effectiveSchema = 
HoodieAvroWriteSupport.generateEffectiveSchema(recordSchema, props);
-    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
-        new AvroSchemaConverter().convert(effectiveSchema.toAvroSchema()), 
recordSchema, Option.empty(), props);
-
-    // Input whose variant column is already shredded (carries typed_value).
-    Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
-    shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT));
-    Schema shreddedVariantAvro =
-        HoodieSchema.createVariantShreddedObject(null, null, null, 
shreddedFields).getAvroSchema();
-
-    Schema inputAvro = Schema.createRecord("guardInput", null, null, false);
-    inputAvro.setFields(Arrays.asList(
-        new Schema.Field("id", Schema.create(Schema.Type.INT), null, (Object) 
null),
-        new Schema.Field("v", shreddedVariantAvro, null, (Object) null)));
-    GenericRecord variantValue = new GenericData.Record(shreddedVariantAvro);
-    variantValue.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(new byte[] {1}));
-    GenericRecord input = new GenericData.Record(inputAvro);
-    input.put("id", 1);
-    input.put("v", variantValue);
-
-    HoodieException ex = assertThrows(HoodieException.class, () -> 
writeSupport.write(input));
-    assertTrue(ex.getMessage().contains("already-shredded") && 
ex.getMessage().contains("18931"),
-        "Expected the read-then-reshred guard message, got: " + 
ex.getMessage());
-  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
new file mode 100644
index 000000000000..872a98a16460
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieVariantReconstruction {
+
+  private static HoodieSchema recordWithVariant(HoodieSchema variantSchema) {
+    return HoodieSchema.createRecord("test_record", "org.apache.hudi.test", 
null,
+        Collections.singletonList(HoodieSchemaField.of("v", variantSchema)));
+  }
+
+  private static HoodieStorage storageWithReadingShredded(Path tmp, boolean 
enabled) {
+    HoodieStorage storage = HoodieTestUtils.getStorage(tmp.toString());
+    
storage.getConf().set(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+        Boolean.toString(enabled));
+    return storage;
+  }
+
+  @Test
+  void failsFastWhenShreddedColumnPresentButReadingDisabled(@TempDir Path tmp) 
{
+    HoodieSchema fileSchema = recordWithVariant(
+        
HoodieSchema.createVariantShredded(HoodieSchema.create(HoodieSchemaType.INT)));
+    HoodieSchema requestedSchema = 
recordWithVariant(HoodieSchema.createVariant());
+
+    HoodieException ex = assertThrows(HoodieException.class, () ->
+        HoodieVariantReconstruction.create(fileSchema, requestedSchema,
+            storageWithReadingShredded(tmp, false)));
+    // The flag check must run after shredded-column detection: dropping 
typed_value silently would
+    // corrupt rows whose payload lives there, so a disabled read of a 
shredded file fails fast.
+    assertTrue(ex.getMessage().contains("reading shredded variants is 
disabled"), ex.getMessage());
+  }
+
+  @Test
+  void returnsNullWhenNoShreddedColumnEvenIfReadingDisabled(@TempDir Path tmp) 
{
+    HoodieSchema schema = recordWithVariant(HoodieSchema.createVariant());
+    // No shredded variant column to reconstruct: nothing to do regardless of 
the flag.
+    assertNull(HoodieVariantReconstruction.create(schema, schema,
+        storageWithReadingShredded(tmp, false)));
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 3108b96fdd18..ebf835008412 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -125,11 +125,23 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
   }
 
   test("Test Query Log Only MOR Table With VARIANT column triggers 
compaction") {
-    assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or 
higher")
+    // Gated on Spark >= 4.1. Compaction writes the base file via the AVRO 
shredding writer, which
+    // lays the variant group out as [metadata, value, typed_value]. Spark 
4.0's read support
+    // (Spark40HoodieParquetReadSupport.reorderVariantFields) rebuilds that 
group as [value, metadata]
+    // and drops typed_value, so the subsequent native read fails with 
MALFORMED_VARIANT. Spark 4.1+
+    // reads variant fields by name (SPARK-54410) and reconstructs correctly.
+    // TODO(voon): drop this comment once Spark 4.0 is removed.
+    assume(HoodieSparkUtils.gteqSpark4_1, "Shredded variant base-file read 
requires Spark 4.1 or higher")
 
     withRecordType()(withTempDir { tmp =>
       val tableName = generateTableName
       val tablePath = tmp.getCanonicalPath
+      // Shred variants on write so the compacted base file is shredded, then 
read it back. Note the
+      // Spark SQL read here goes through the InternalRow reader 
(SparkFileFormatInternalRowReaderContext),
+      // which reconstructs the shredded variant natively - it does NOT 
exercise the AVRO read-path
+      // reconstruction (#18931, HoodieVariantReconstruction), which Spark 
compaction never reaches.
+      // That path is covered directly by TestSpark4VariantShreddingProvider 
and
+      // TestHoodieVariantReconstruction.
       spark.sql(
         s"""
            |create table $tableName (
@@ -142,6 +154,8 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
            |  primaryKey = 'id',
            |  type = 'mor',
            |  preCombineField = 'ts',
+           |  hoodie.parquet.variant.write.shredding.enabled = 'true',
+           |  hoodie.parquet.variant.force.shredding.schema.for.test = 'key 
string',
            |  hoodie.index.type = 'INMEMORY',
            |  hoodie.compact.inline = 'true',
            |  hoodie.compact.inline.max.delta.commits = '5',
@@ -578,9 +592,9 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
            |insert into $tableName values
            |  (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
         """.stripMargin)
-      // Reading shredded variants back needs Spark 4.1+ 
(spark.sql.variant.allowReadingShredded);
-      // Spark 4.0's reader rejects the 3-field shredded layout (4.0 read 
support is added later, see
-      // https://github.com/apache/hudi/issues/18931). The shredded write is 
still validated below.
+      // Reading shredded variants back via Spark SQL needs Spark 4.1+ 
(spark.sql.variant.allowReadingShredded,
+      // SPARK-54410); Spark 4.0's native reader rejects the 3-field shredded 
layout. The shredded write is
+      // still validated below.
       if (HoodieSparkUtils.gteqSpark4_1) {
         checkAnswer(s"select id, name, cast(v as string), ts from $tableName 
order by id")(
           Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
index a0aa663f9463..5864a56d0c26 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
@@ -21,18 +21,23 @@ package org.apache.hudi.variant;
 
 import org.apache.hudi.avro.VariantShreddingProvider;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.ShreddingUtils;
 import org.apache.spark.types.variant.Variant;
 import org.apache.spark.types.variant.VariantSchema;
 import org.apache.spark.types.variant.VariantShreddingWriter;
 import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
 import 
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
@@ -86,6 +91,35 @@ public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider
     return result.toGenericRecord();
   }
 
+  @Override
+  public GenericRecord rebuildVariantRecord(
+      GenericRecord shreddedVariant,
+      Schema shreddedSchema,
+      Schema unshreddedSchema) {
+
+    if (shreddedVariant == null) {
+      return null;
+    }
+    ByteBuffer metadataBuf = (ByteBuffer) 
shreddedVariant.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+    if (metadataBuf == null) {
+      // metadata is REQUIRED in the shredded schema, so a null here means the 
base file is malformed;
+      // fail fast rather than silently nulling out the variant column.
+      throw new HoodieException("Cannot reconstruct variant: shredded record 
is missing the required '"
+          + HoodieSchema.Variant.VARIANT_METADATA_FIELD + "' field; the base 
file is malformed.");
+    }
+
+    // Reuse the same VariantSchema index assignment as the write path (no 
builder needed on read).
+    VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, null);
+
+    // Delegate to Spark's reconstruction algorithm (inverse of castShredded).
+    Variant variant = ShreddingUtils.rebuild(new 
AvroVariantRow(shreddedVariant, sparkSchema), sparkSchema);
+
+    GenericRecord out = new GenericData.Record(unshreddedSchema);
+    out.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(variant.getMetadata()));
+    out.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
ByteBuffer.wrap(variant.getValue()));
+    return out;
+  }
+
   /**
    * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a
    * shredded variant structure ({@code value}, {@code metadata}, {@code 
typed_value}).
@@ -150,7 +184,11 @@ public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider
         typedIdx, variantIdx, topLevelMetadataIdx, numFields,
         scalarSchema, objectSchema, arraySchema);
 
-    builder.registerSchema(result, avroSchema);
+    // The read (rebuild) path passes a null builder: it needs the 
VariantSchema indices but no
+    // Avro-schema registration (registration only feeds write-side result 
construction).
+    if (builder != null) {
+      builder.registerSchema(result, avroSchema);
+    }
 
     return result;
   }
@@ -178,11 +216,12 @@ public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider
       if ("local-timestamp-micros".equals(name)) {
         return new VariantSchema.TimestampNTZType();
       }
-      if ("timestamp-millis".equals(name)) {
-        return new VariantSchema.TimestampType();
-      }
-      if ("local-timestamp-millis".equals(name)) {
-        return new VariantSchema.TimestampNTZType();
+      // The Variant binary spec stores timestamps in microseconds, so a 
millisecond-precision
+      // typed_value cannot represent a variant timestamp. Decline to shred it 
as a scalar (the value
+      // stays in the residual unshredded binary) rather than mapping it to 
the micros TimestampType,
+      // which would silently scale the value by 1000x.
+      if ("timestamp-millis".equals(name) || 
"local-timestamp-millis".equals(name)) {
+        return null;
       }
       if ("uuid".equals(name)) {
         return new VariantSchema.UuidType();
@@ -398,4 +437,240 @@ public class Spark4VariantShreddingProvider implements 
VariantShreddingProvider
       return true;
     }
   }
+
+  /**
+   * Base {@link ShreddingUtils.ShreddedRow} with all accessors throwing; 
concrete rows override
+   * only the accessors valid for their nesting context. This is the read-path 
mirror of the
+   * write-path {@link AvroShreddedResult}: it reads Avro records to feed 
Spark's reconstruction
+   * ({@link ShreddingUtils#rebuild}).
+   */
+  abstract static class BaseAvroShreddedRow implements 
ShreddingUtils.ShreddedRow {
+    @Override
+    public boolean isNullAt(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public boolean getBoolean(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public byte getByte(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public short getShort(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public int getInt(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public long getLong(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public float getFloat(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public double getDouble(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+      throw unsupported();
+    }
+
+    @Override
+    public String getString(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public byte[] getBinary(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public UUID getUuid(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public ShreddingUtils.ShreddedRow getStruct(int ordinal, int numFields) {
+      throw unsupported();
+    }
+
+    @Override
+    public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+      throw unsupported();
+    }
+
+    @Override
+    public int numElements() {
+      throw unsupported();
+    }
+
+    private static UnsupportedOperationException unsupported() {
+      return new UnsupportedOperationException("Accessor not valid for this 
shredded row context");
+    }
+  }
+
+  /**
+   * A shredded variant struct {@code {value, [metadata], typed_value}}. Maps 
the Spark
+   * {@link VariantSchema} ordinals (variantIdx / topLevelMetadataIdx / 
typedIdx) back to the named
+   * Avro fields, and reads {@code typed_value} for scalar/object/array 
reconstruction.
+   */
+  static final class AvroVariantRow extends BaseAvroShreddedRow {
+    private final GenericRecord record;
+    private final VariantSchema schema;
+
+    AvroVariantRow(GenericRecord record, VariantSchema schema) {
+      this.record = record;
+      this.schema = schema;
+    }
+
+    private String fieldNameFor(int ordinal) {
+      if (ordinal == schema.typedIdx) {
+        return HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD;
+      }
+      if (ordinal == schema.variantIdx) {
+        return HoodieSchema.Variant.VARIANT_VALUE_FIELD;
+      }
+      if (ordinal == schema.topLevelMetadataIdx) {
+        return HoodieSchema.Variant.VARIANT_METADATA_FIELD;
+      }
+      throw new IllegalArgumentException("Unexpected shredded ordinal: " + 
ordinal);
+    }
+
+    @Override public boolean isNullAt(int ordinal) {
+      return record.get(fieldNameFor(ordinal)) == null;
+    }
+
+    @Override public byte[] getBinary(int ordinal) {
+      return toByteArray((ByteBuffer) record.get(fieldNameFor(ordinal)));
+    }
+
+    // The scalar getters below read typed_value directly: Spark only invokes 
them for the scalar
+    // typed_value (ordinal == typedIdx), so resolving via 
fieldNameFor(ordinal) would be redundant.
+    // isNullAt/getBinary stay on fieldNameFor because they are also called 
for value/metadata.
+    @Override public boolean getBoolean(int ordinal) {
+      return (Boolean) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+    }
+
+    @Override public byte getByte(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).byteValue();
+    }
+
+    @Override public short getShort(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).shortValue();
+    }
+
+    @Override public int getInt(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).intValue();
+    }
+
+    @Override public long getLong(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).longValue();
+    }
+
+    @Override public float getFloat(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).floatValue();
+    }
+
+    @Override public double getDouble(int ordinal) {
+      return ((Number) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).doubleValue();
+    }
+
+    @Override public String getString(int ordinal) {
+      return 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).toString();
+    }
+
+    @Override public UUID getUuid(int ordinal) {
+      return 
UUID.fromString(record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).toString());
+    }
+
+    @Override public BigDecimal getDecimal(int ordinal, int precision, int 
scale) {
+      Object value = 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+      if (value instanceof BigDecimal) {
+        return (BigDecimal) value;
+      }
+      Schema tvSchema = 
unwrapNullable(record.getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).schema());
+      Conversions.DecimalConversion conversion = new 
Conversions.DecimalConversion();
+      if (value instanceof ByteBuffer) {
+        return conversion.fromBytes((ByteBuffer) value, tvSchema, 
tvSchema.getLogicalType());
+      }
+      if (value instanceof GenericFixed) {
+        return conversion.fromFixed((GenericFixed) value, tvSchema, 
tvSchema.getLogicalType());
+      }
+      throw new IllegalStateException("Unexpected decimal representation: " + 
value);
+    }
+
+    @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int 
numFields) {
+      // Object shredding: typed_value is a record whose fields are the 
shredded object fields.
+      return new AvroObjectRow((GenericRecord) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD), schema);
+    }
+
+    @Override public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+      return new AvroArrayRow((List<?>) 
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD), schema.arraySchema);
+    }
+  }
+
+  /**
+   * The {@code typed_value} record of an object-shredded variant: ordinal 
{@code i} addresses the
+   * i-th shredded object field (a nested {@code {value, typed_value}} struct).
+   */
+  static final class AvroObjectRow extends BaseAvroShreddedRow {
+    private final GenericRecord typedValueRecord;
+    private final VariantSchema parentSchema;
+
+    AvroObjectRow(GenericRecord typedValueRecord, VariantSchema parentSchema) {
+      this.typedValueRecord = typedValueRecord;
+      this.parentSchema = parentSchema;
+    }
+
+    @Override public boolean isNullAt(int ordinal) {
+      return 
typedValueRecord.get(parentSchema.objectSchema[ordinal].fieldName) == null;
+    }
+
+    @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int 
numFields) {
+      VariantSchema.ObjectField field = parentSchema.objectSchema[ordinal];
+      return new AvroVariantRow((GenericRecord) 
typedValueRecord.get(field.fieldName), field.schema);
+    }
+  }
+
+  /**
+   * The {@code typed_value} array of an array-shredded variant: each element 
is a shredded variant
+   * struct following {@code elementSchema}.
+   */
+  static final class AvroArrayRow extends BaseAvroShreddedRow {
+    private final List<?> elements;
+    private final VariantSchema elementSchema;
+
+    AvroArrayRow(List<?> elements, VariantSchema elementSchema) {
+      this.elements = elements;
+      this.elementSchema = elementSchema;
+    }
+
+    @Override public int numElements() {
+      return elements.size();
+    }
+
+    @Override public boolean isNullAt(int ordinal) {
+      return elements.get(ordinal) == null;
+    }
+
+    @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int 
numFields) {
+      return new AvroVariantRow((GenericRecord) elements.get(ordinal), 
elementSchema);
+    }
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
 
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
new file mode 100644
index 000000000000..0f851e98fa6a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.variant.Spark4VariantShreddingProvider;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Round-trip coverage for the successful {@code create} -> {@code 
reconstruct} path of
+ * {@link HoodieVariantReconstruction}: the AVRO read-path orchestration 
(isTarget alignment,
+ * shredded/unshredded sub-schema indexing, and per-record field mapping).
+ *
+ * <p>Runs in hudi-spark4-common so the real {@link 
Spark4VariantShreddingProvider} is auto-detected
+ * on the classpath; the package is {@code org.apache.hudi.io.storage.hadoop} 
to reach the
+ * package-private class. Spark compaction never reaches this path (it reads 
base files via the
+ * InternalRow reader), so this is the only place the alignment is exercised 
end to end.
+ * The null/throw guards of {@code create} are covered by {@code 
TestHoodieVariantReconstruction}
+ * (hudi-hadoop-common); the provider's shred/rebuild in isolation by {@code 
TestSpark4VariantShreddingProvider}.
+ */
+class TestHoodieVariantReconstructionRoundTrip {
+
+  @Test
+  void createThenReconstructRebuildsVariantAndPassesThroughNonVariant(@TempDir 
Path tmp) throws Exception {
+    // A shredded variant column "v" alongside a non-variant column "id", to 
exercise field alignment.
+    Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+    shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.STRING));
+    shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.LONG));
+    HoodieSchema.Variant shreddedVariant = 
HoodieSchema.createVariantShreddedObject(shreddedFields);
+    HoodieSchema.Variant unshreddedVariant = HoodieSchema.createVariant();
+
+    HoodieSchema fileSchema = HoodieSchema.createRecord("r", 
"org.apache.hudi.test", null, Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of("v", shreddedVariant)));
+    HoodieSchema requestedSchema = HoodieSchema.createRecord("r", 
"org.apache.hudi.test", null, Arrays.asList(
+        HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of("v", unshreddedVariant)));
+
+    HoodieStorage storage = HoodieTestUtils.getStorage(tmp.toString()); // 
allow.reading.shredded defaults true
+    HoodieVariantReconstruction reconstruction =
+        HoodieVariantReconstruction.create(fileSchema, requestedSchema, 
storage);
+    assertNotNull(reconstruction, "create should build a reconstruction for a 
shredded variant column");
+
+    // Build the input the parquet-avro reader would produce: a record at the 
intermediate (shredded)
+    // schema, i.e. {id, v=<shredded {metadata, value, typed_value}>}.
+    Spark4VariantShreddingProvider provider = new 
Spark4VariantShreddingProvider();
+    Variant original = VariantBuilder.parseJson("{\"a\":\"x\",\"b\":5}", 
false);
+    GenericRecord unshreddedV = new 
GenericData.Record(unshreddedVariant.getAvroSchema());
+    unshreddedV.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(original.getMetadata()));
+    unshreddedV.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
ByteBuffer.wrap(original.getValue()));
+    GenericRecord shreddedV =
+        provider.shredVariantRecord(unshreddedV, 
shreddedVariant.getAvroSchema(), shreddedVariant);
+
+    GenericRecord input = new 
GenericData.Record(reconstruction.intermediateSchema().getAvroSchema());
+    input.put("id", 7L);
+    input.put("v", shreddedV);
+
+    IndexedRecord out = reconstruction.reconstruct(input);
+
+    // Non-variant column passes through unchanged at its position; the 
variant column is rebuilt unshredded.
+    assertEquals(7L, out.get(0));
+    GenericRecord rebuiltV = (GenericRecord) out.get(1);
+    Variant rebuilt = new Variant(
+        toBytes(rebuiltV.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD)),
+        toBytes(rebuiltV.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD)));
+    assertEquals(original.toJson(ZoneOffset.UTC), 
rebuilt.toJson(ZoneOffset.UTC));
+  }
+
+  private static byte[] toBytes(Object byteBuffer) {
+    ByteBuffer buf = ((ByteBuffer) byteBuffer).duplicate();
+    byte[] out = new byte[buf.remaining()];
+    buf.get(out);
+    return out;
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
 
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
new file mode 100644
index 000000000000..37800e98dc02
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Round-trip coverage for {@link Spark4VariantShreddingProvider}: shred an 
unshredded variant, then
+ * reconstruct it, and assert it round-trips. This exercises {@code 
rebuildVariantRecord} and the
+ * {@code AvroVariantRow}/{@code AvroObjectRow}/{@code AvroArrayRow} accessors 
across scalar, object,
+ * and array shapes - the AVRO read-path reconstruction (#18931) that the 
Spark MOR SQL test cannot
+ * reach (Spark compaction reads base files via the InternalRow reader, not 
HoodieAvroParquetReader).
+ */
+class TestSpark4VariantShreddingProvider {
+
+  private final Spark4VariantShreddingProvider provider = new 
Spark4VariantShreddingProvider();
+  private final Schema unshreddedSchema = 
HoodieSchema.createVariant().getAvroSchema();
+
+  /** Parse json to a variant, shred it to {@code shredded}, rebuild it, 
assert the json round-trips. */
+  private void assertRoundTrips(String json, HoodieSchema.Variant shredded) 
throws Exception {
+    Variant variant = VariantBuilder.parseJson(json, false);
+    GenericRecord unshreddedRecord = new GenericData.Record(unshreddedSchema);
+    unshreddedRecord.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD, 
ByteBuffer.wrap(variant.getMetadata()));
+    unshreddedRecord.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
ByteBuffer.wrap(variant.getValue()));
+
+    Schema shreddedSchema = shredded.getAvroSchema();
+    GenericRecord shreddedRecord = 
provider.shredVariantRecord(unshreddedRecord, shreddedSchema, shredded);
+    GenericRecord rebuilt = provider.rebuildVariantRecord(shreddedRecord, 
shreddedSchema, unshreddedSchema);
+
+    Variant rebuiltVariant = new Variant(
+        toBytes(rebuilt.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD)),
+        toBytes(rebuilt.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD)));
+    assertEquals(variant.toJson(ZoneOffset.UTC), 
rebuiltVariant.toJson(ZoneOffset.UTC),
+        "variant did not round-trip through shred/rebuild for: " + json);
+  }
+
+  private void assertScalarRoundTrips(String json, HoodieSchema typedValue) 
throws Exception {
+    assertRoundTrips(json, HoodieSchema.createVariantShredded(typedValue));
+  }
+
+  @Test
+  void numericRoundTrips() throws Exception {
+    assertScalarRoundTrips("42", HoodieSchema.create(HoodieSchemaType.LONG));
+  }
+
+  @Test
+  void stringRoundTrips() throws Exception {
+    assertScalarRoundTrips("\"hello world\"", 
HoodieSchema.create(HoodieSchemaType.STRING));
+  }
+
+  @Test
+  void booleanRoundTrips() throws Exception {
+    assertScalarRoundTrips("true", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN));
+  }
+
+  @Test
+  void decimalRoundTrips() throws Exception {
+    assertScalarRoundTrips("123.45", HoodieSchema.createDecimal(10, 2));
+  }
+
+  @Test
+  void objectRoundTrips() throws Exception {
+    Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+    shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.STRING));
+    shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.LONG));
+    assertRoundTrips("{\"a\":\"x\",\"b\":5}", 
HoodieSchema.createVariantShreddedObject(shreddedFields));
+  }
+
+  @Test
+  void arrayRoundTrips() throws Exception {
+    // typed_value for an array is array<{value, typed_value}>: each element 
is itself a shredded struct.
+    HoodieSchema element = HoodieSchema.createRecord("v_array_element", 
"org.apache.hudi.test", null, Arrays.asList(
+        HoodieSchemaField.of(HoodieSchema.Variant.VARIANT_VALUE_FIELD, 
HoodieSchema.createNullable(HoodieSchemaType.BYTES)),
+        HoodieSchemaField.of(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, 
HoodieSchema.create(HoodieSchemaType.LONG))));
+    assertScalarRoundTrips("[1,2,3]", HoodieSchema.createArray(element));
+  }
+
+  private static byte[] toBytes(Object byteBuffer) {
+    ByteBuffer buf = ((ByteBuffer) byteBuffer).duplicate();
+    byte[] out = new byte[buf.remaining()];
+    buf.get(out);
+    return out;
+  }
+}

Reply via email to