This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3648230469f5 feat(variant): support reading shredded variant base
files via the AVRO reader (#18938)
3648230469f5 is described below
commit 3648230469f5e25663374d7bcc2dfd12b9100e31
Author: voonhous <[email protected]>
AuthorDate: Thu Jun 25 02:41:02 2026 +0800
feat(variant): support reading shredded variant base files via the AVRO
reader (#18938)
* feat(variant): reconstruct unshredded variant on the AVRO read path
(#18931)
Compaction/clustering reading an already-shredded base file via the AVRO
record
path now rebuilds the unshredded {metadata, value} variant before records
reach
the merger/writer, replacing the prior fail-fast guard.
- VariantShreddingProvider: add rebuildVariantRecord (inverse of
shredVariantRecord);
Spark4VariantShreddingProvider implements it via Spark's
ShreddingUtils.rebuild over
Avro-backed ShreddedRow rows.
- HoodieAvroParquetReader: detect shredded variant columns, read them at
the file's
shredded schema, and reconstruct to unshredded per record
(VariantReconstruction);
provider loaded via config or classpath, gated on
hoodie.parquet.variant.allow.reading.shredded.
- Extract stripVariantShredding into VariantSchemaUtils (shared by
reader/writer).
- Remove the read-then-reshred guard from HoodieAvroWriteSupport and its
unit test.
- Extend the MOR compaction test in TestVariantDataType to write shredded
and read
back (AVRO reconstruction + SPARK native via withRecordType).
* test(variant): gate shredded MOR compaction test on Spark >= 4.1
The compaction base file is written by the AVRO shredding writer as
[metadata, value, typed_value]. Spark 4.0's reorderVariantFields rebuilds
that group as [value, metadata] and drops typed_value, so the native read
after compaction fails with MALFORMED_VARIANT. Spark 4.1+ reads variant
fields by name (SPARK-54410) and reconstructs correctly.
Fixes the spark4.0 leg of #18931.
* review(variant): drop no-op makeNullable in applyForcedShreddingSchema
withSchema(replacement) rebuilds the field from replacement, which already
mirrors the original nullability, so the intermediate makeNullable() was a
no-op (and could reset field order for non-nullable fields).
* review(variant): centralize shredding-provider classpath detection
VariantReconstruction and HoodieAvroFileWriterFactory each hardcoded the
provider FQN and a Class.forName lookup. Move the candidate list and
detection
into a shared VariantShreddingProvider.detectProviderClassOnClasspath(), so
a
new provider impl is registered in one place.
* review(variant): rename VariantReconstruction to
HoodieVariantReconstruction
Every other class in org.apache.hudi.io.storage.hadoop carries the Hoodie
prefix (HoodieAvroParquetReader, HoodieHadoopIOFactory, ...); this was the
lone
exception. Package-private with a single caller, so the rename is contained.
* review(variant): rename nonNull to unwrapNullable in
HoodieVariantReconstruction
The helper unwraps a nullable union type, not a null-assertion guard; the
new
name reflects what it does at each call site.
* fix(variant): fail fast when a shredded base file has no reconstruction
provider
When a requested column is shredded in the base file and reading shredded
variants is enabled but no provider is loadable, create() returned null and
the
reader fell back to the unshredded requested schema. parquet-avro then reads
value/metadata and drops typed_value, silently corrupting variants whose
payload
lived in typed_value. Throw HoodieException instead, mirroring the write
path.
Removes the now-unused logger.
* fix(variant): fail fast on missing metadata in rebuildVariantRecord
metadata is REQUIRED in the shredded parquet schema, so a null metadata on
the
read path means a malformed base file. Returning null let the caller null
out the
whole variant column, silently dropping data; throw HoodieException
instead. The
separate null-record guard stays (genuine null variant passes through).
* test(variant): correct stale #18931 reference in shredded read-back
comment
The Spark SQL read-back is gated on Spark 4.1+ because Spark 4.0's native
parquet
reader rejects the 3-field shredded layout (SPARK-54410) - not because of
#18931.
#18931 is the AVRO reader reconstruction and does not affect the Spark
native read,
so drop the misattributed pointer.
* review(variant): document why AvroVariantRow scalar getters read
typed_value directly
The scalar getters ignore ordinal and read typed_value directly while
isNullAt/getBinary go through fieldNameFor(ordinal). Spark only calls the
scalar getters for the scalar typed_value, so the asymmetry is intentional;
add a comment so it does not read as an oversight.
* fix(variant): decline millisecond-precision timestamps in shredding type
mapping
avroTypeToScalarType mapped timestamp-millis / local-timestamp-millis to the
micros-based TimestampType / TimestampNTZType. The Variant binary spec
stores
timestamps in microseconds, so a millis-precision typed_value would be read
back
as micros and scaled 1000x. Unreachable today (Hudi/Spark only produce
micros
typed_value), but decline to shred millis as a scalar so it can never
silently
corrupt; the value stays in the residual unshredded binary.
* fix(variant): fail fast on disabled shredded read instead of dropping
typed_value
HoodieVariantReconstruction.create returned null on
allow.reading.shredded=false
before detecting whether the file actually has shredded variant columns. A
shredded base file was then read at the unshredded requested schema,
dropping
typed_value -- rows whose payload lived there came back with a null value
(silent corruption), which compaction/clustering could then persist.
Move the flag check below the shredded-column detection: return null when no
shredded variant column is present (nothing to reconstruct, regardless of
the
flag), and throw when shredded columns are present but reading is disabled -
mirroring the no-provider branch and Spark's allowReadingShredded=false,
which
rejects shredded reads rather than discarding data. Adds a regression test.
* test(variant): cover the Avro shredded-read reconstruction directly
The Spark MOR SQL test ("...triggers compaction") does not exercise the AVRO
reconstruction it was meant to cover: withRecordType only swaps the record
merger and log-block format, and Spark compaction reads base files via
SparkFileFormatInternalRowReaderContext (InternalRow), so
HoodieAvroParquetReader
- where HoodieVariantReconstruction / rebuildVariantRecord run - is never
reached.
Add TestSpark4VariantShreddingProvider: a direct shred -> rebuild
round-trip over
the provider, covering scalar (numeric/string/boolean/decimal), object, and
array
variant shapes. This exercises rebuildVariantRecord and the AvroVariantRow/
AvroObjectRow/AvroArrayRow accessors that the SQL test cannot reach. Also
correct
the misleading comment on the SQL test to state it covers the Spark
InternalRow
(native) read of a shredded base file, not the Avro path.
* fix(variant): copy non-target fields when building the reconstruction
intermediate schema
HoodieVariantReconstruction.create reused the requested field instances for
non-target (non-variant) columns when assembling the intermediate read
schema.
Those Avro Fields are already bound to the requested record, so
Schema.setFields
throws "Field already used" as soon as the requested record has any
non-variant
column alongside a shredded variant - i.e. every real table. The target
branch
avoided this only because withSchema() builds a fresh Field; do the same for
non-target fields.
This was latent because nothing exercised the successful create ->
reconstruct
path: Spark compaction reads base files via the InternalRow reader, and the
Java/Flink AVRO path fails fast first (no provider). Add
TestHoodieVariantReconstructionRoundTrip (a create -> reconstruct
round-trip with
the real provider, asserting variant rebuild + non-variant pass-through and
field
alignment), which is what surfaced the bug.
* docs(variant): correct rebuildVariantRecord javadoc to match the impl
The @return claimed null is returned when metadata is missing, but the
Spark4
impl returns null only when shreddedVariant is null and throws
HoodieException
on missing metadata. Fix the @return and add a matching @throws.
---
.../org/apache/hudi/avro/VariantSchemaUtils.java | 85 ++++++
.../apache/hudi/avro/VariantShreddingProvider.java | 45 ++++
.../apache/hudi/avro/HoodieAvroWriteSupport.java | 93 +------
.../hadoop/HoodieAvroFileWriterFactory.java | 21 +-
.../io/storage/hadoop/HoodieAvroParquetReader.java | 21 +-
.../hadoop/HoodieVariantReconstruction.java | 194 ++++++++++++++
.../io/hadoop/TestHoodieBaseParquetWriter.java | 51 ----
.../hadoop/TestHoodieVariantReconstruction.java | 75 ++++++
.../sql/hudi/dml/schema/TestVariantDataType.scala | 22 +-
.../variant/Spark4VariantShreddingProvider.java | 287 ++++++++++++++++++++-
.../TestHoodieVariantReconstructionRoundTrip.java | 113 ++++++++
.../TestSpark4VariantShreddingProvider.java | 118 +++++++++
12 files changed, 949 insertions(+), 176 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java
new file mode 100644
index 000000000000..aec64fa0cf3d
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/VariantSchemaUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shared helpers for converting between shredded and unshredded variant
schemas.
+ * Used by both the write path ({@link HoodieAvroWriteSupport}) and the read
path
+ * (variant reconstruction in the parquet reader).
+ */
+public class VariantSchemaUtils {
+
+ private VariantSchemaUtils() {
+ }
+
+ /**
+ * Strips shredding from top-level variant fields in {@code schema},
replacing each shredded
+ * variant with its unshredded form (dropping {@code typed_value}).
Non-variant fields and
+ * already-unshredded variants pass through unchanged; returns {@code
schema} as-is when nothing
+ * changes.
+ */
+ public static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+ if (variant.isShredded()) {
+ HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc());
+ HoodieSchema replacement = wasNullable ?
HoodieSchema.createNullable(unshredded) : unshredded;
+ newFields.add(field.withSchema(replacement));
+ changed = true;
+ continue;
+ }
+ }
+ newFields.add(field);
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
index 90ffbee1e53f..910554c50a40 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
@@ -37,6 +37,29 @@ import org.apache.avro.generic.GenericRecord;
*/
public interface VariantShreddingProvider {
+ /**
+ * Provider implementations to auto-detect on the classpath, in priority
order, when the variant
+ * shredding provider class is not set explicitly via config. Sole provider
today: variant
+ * shredding currently requires Spark 4.0+.
+ */
+ String[] CLASSPATH_CANDIDATES =
{"org.apache.hudi.variant.Spark4VariantShreddingProvider"};
+
+ /**
+ * Returns the fully-qualified class name of the first {@link
VariantShreddingProvider}
+ * implementation available on the classpath, or {@code null} if none is
present.
+ */
+ static String detectProviderClassOnClasspath() {
+ for (String candidate : CLASSPATH_CANDIDATES) {
+ try {
+ Class.forName(candidate);
+ return candidate;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // Provider not on classpath; try the next candidate.
+ }
+ }
+ return null;
+ }
+
/**
* Transform an unshredded variant GenericRecord into a shredded one.
* <p>
@@ -63,4 +86,26 @@ public interface VariantShreddingProvider {
GenericRecord unshreddedVariant,
Schema shreddedSchema,
HoodieSchema.Variant variantSchema);
+
+ /**
+ * Reconstruct an unshredded variant GenericRecord from a shredded one (the
inverse of
+ * {@link #shredVariantRecord}).
+ * <p>
+ * Used on the read path: records read from an already-shredded base file
(compaction/clustering)
+ * arrive with {@code typed_value} populated. This rebuilds the full variant
binary so the record
+ * presents the standard unshredded {@code {metadata, value}} shape before
it reaches the
+ * merger/writer.
+ *
+ * @param shreddedVariant GenericRecord with {value, metadata, typed_value}
read from a shredded base file
+ * @param shreddedSchema the Avro schema of {@code shreddedVariant}
(carries typed_value)
+ * @param unshreddedSchema target Avro schema with {value: ByteBuffer,
metadata: ByteBuffer}
+ * @return a GenericRecord conforming to {@code unshreddedSchema} with the
full reconstructed
+ * variant binary in {@code value}, or {@code null} when {@code
shreddedVariant} is null
+ * @throws org.apache.hudi.exception.HoodieException if {@code
shreddedVariant} is missing its
+ * required {@code metadata} field (a malformed shredded base file)
+ */
+ GenericRecord rebuildVariantRecord(
+ GenericRecord shreddedVariant,
+ Schema shreddedSchema,
+ Schema unshreddedSchema);
}
\ No newline at end of file
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 3cd3c965388b..497aa3008406 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -88,13 +88,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
*/
private final VariantShreddingProvider shreddingProvider;
- /**
- * Names of all variant-typed top-level fields, regardless of shredding.
Used to fail fast on the
- * not-yet-supported read-then-reshred path (compaction/clustering over an
already-shredded base
- * file). See https://github.com/apache/hudi/issues/18931.
- */
- private final String[] variantFieldNames;
-
public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema,
Option<BloomFilter> bloomFilterOpt,
Properties properties) {
this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema,
properties), bloomFilterOpt, properties);
@@ -115,10 +108,8 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
- // Single pass over the effective schema fields: collect every
variant-typed field name (for the
- // read-then-reshred guard) and, when shredding is enabled, the subset
that needs shredding.
+ // When shredding is enabled, collect the variant fields that need
shredding.
Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
- List<String> variantNames = new ArrayList<>();
if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
List<HoodieSchemaField> fields = effectiveSchema.getFields();
@@ -132,7 +123,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
if (fieldSchema.getType() != HoodieSchemaType.VARIANT) {
continue;
}
- variantNames.add(field.name());
if (variantWriteShreddingEnabled) {
HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
if (variant.isShredded() &&
variant.getTypedValueField().isPresent()) {
@@ -149,7 +139,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
}
this.shreddedVariantFields = shreddedFields;
- this.variantFieldNames = variantNames.toArray(new String[0]);
// Load shredding provider via reflection if needed
if (!shreddedVariantFields.isEmpty()) {
@@ -194,7 +183,7 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
// Schemas from clustering/compaction may still be shredded (read from
on-disk Parquet files
// written with shredding enabled), so we need to strip typed_value when
shredding
// is disabled.
- return stripVariantShredding(hoodieSchema);
+ return VariantSchemaUtils.stripVariantShredding(hoodieSchema);
}
// Check if a forced shredding schema is configured
@@ -219,9 +208,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
@SuppressWarnings("unchecked")
@Override
public void write(T record) {
- if (variantFieldNames.length > 0) {
- assertInputNotAlreadyShredded((IndexedRecord) record);
- }
if (!shreddedVariantFields.isEmpty() && shreddingProvider != null) {
super.write((T) shredRecord((IndexedRecord) record));
} else {
@@ -273,32 +259,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
return shreddedRecord;
}
- /**
- * Fails fast on the not-yet-supported read-then-reshred path. Records read
from an already-shredded
- * base file (compaction/clustering) arrive with a populated {@code
typed_value} and a possibly-null
- * {@code value}. The writer has no logic to reconstruct the unshredded
variant, so shredding would
- * silently drop the payload (shredding enabled) and the parquet writer
would reject the null at the
- * REQUIRED {@code value} field (shredding disabled). Reconstruction is
tracked in
- * https://github.com/apache/hudi/issues/18931.
- */
- private void assertInputNotAlreadyShredded(IndexedRecord inputRecord) {
- Schema inputSchema = inputRecord.getSchema();
- for (String fieldName : variantFieldNames) {
- Schema.Field field = inputSchema.getField(fieldName);
- if (field == null) {
- continue;
- }
- Object value = inputRecord.get(field.pos());
- if (value instanceof GenericRecord
- && ((GenericRecord)
value).getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD) !=
null) {
- throw new HoodieException("Writing an already-shredded variant field
'" + fieldName
- + "' is not supported yet. Compaction/clustering read a base file
written with variant "
- + "shredding and re-wrote it through the Avro path; the reader
does not yet reconstruct "
- + "the unshredded variant. Tracked in
https://github.com/apache/hudi/issues/18931.");
- }
- }
- }
-
@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
Map<String, String> extraMetadata =
@@ -365,7 +325,8 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
shreddedFields);
HoodieSchema replacement = wasNullable
? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
-
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ // replacement already mirrors the field's original nullability, so
withSchema alone suffices.
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.withSchema(replacement)));
changed = true;
} else {
newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
@@ -434,52 +395,6 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
}
}
- /**
- * Strips shredding from variant fields in the schema.
- * Replaces shredded variant fields with unshredded variants (removing
typed_value).
- */
- private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
- if (schema.getType() != HoodieSchemaType.RECORD) {
- return schema;
- }
-
- List<HoodieSchemaField> fields = schema.getFields();
- List<HoodieSchemaField> newFields = new ArrayList<>();
- boolean changed = false;
-
- for (HoodieSchemaField field : fields) {
- HoodieSchema fieldSchema = field.schema();
- boolean wasNullable = fieldSchema.isNullable();
- HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
-
- if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
- HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
- if (variant.isShredded()) {
- // Replace with unshredded variant
- HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
- unwrapped.getAvroSchema().getName(),
- unwrapped.getAvroSchema().getNamespace(),
- unwrapped.getAvroSchema().getDoc());
- HoodieSchema replacement = wasNullable ?
HoodieSchema.createNullable(unshredded) : unshredded;
- newFields.add(field.withSchema(replacement));
- changed = true;
- continue;
- }
- }
- newFields.add(field);
- }
-
- if (!changed) {
- return schema;
- }
-
- return HoodieSchema.createRecord(
- schema.getAvroSchema().getName(),
- schema.getAvroSchema().getNamespace(),
- schema.getAvroSchema().getDoc(),
- newFields);
- }
-
/**
* Extracts the non-null type from a union schema.
*/
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index 4d2630d2e7aa..b5b94ef411b7 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -20,6 +20,7 @@
package org.apache.hudi.io.storage.hadoop;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.avro.VariantShreddingProvider;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieParquetConfig;
@@ -60,11 +61,6 @@ import static
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSc
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
- // Sole provider today: variant shredding currently requires Spark 4.0+. If
more providers are
- // added, restore a candidate list here and try each in turn.
- private static final String SPARK4_VARIANT_SHREDDING_PROVIDER =
- "org.apache.hudi.variant.Spark4VariantShreddingProvider";
-
public HoodieAvroFileWriterFactory(HoodieStorage storage) {
super(storage);
}
@@ -152,7 +148,7 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
Properties props = TypedProperties.copy(config.getProps());
// Auto-detect variant shredding provider from classpath if not explicitly
configured
if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
- String detectedClass = detectShreddingProviderClass();
+ String detectedClass =
VariantShreddingProvider.detectProviderClassOnClasspath();
if (detectedClass != null) {
props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(),
detectedClass);
}
@@ -167,17 +163,4 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
getAvroSchemaConverter((Configuration)
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema,
filter, props);
}
- /**
- * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider}
implementation
- * available on the classpath. Returns the fully-qualified class name if
found, or null.
- */
- private static String detectShreddingProviderClass() {
- try {
- Class.forName(SPARK4_VARIANT_SHREDDING_PROVIDER);
- return SPARK4_VARIANT_SHREDDING_PROVIDER;
- } catch (ClassNotFoundException e) {
- // not on classpath
- return null;
- }
- }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
index e03eec3fed17..5c8cfa9922be 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroParquetReader.java
@@ -185,19 +185,23 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
// sure that in case the file-schema is not equal to read-schema
we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf =
storage.getConf().unwrapCopyAs(Configuration.class);
- HoodieSchema repairedFileSchema =
HoodieSchemaRepair.repairLogicalTypes(getSchema(), schema);
+ // If the on-disk file shreds variant columns, read them in their shredded
(typed_value-bearing)
+ // shape and reconstruct the unshredded variant per-record below (#18931).
null when not needed.
+ HoodieVariantReconstruction reconstruction =
HoodieVariantReconstruction.create(getSchema(), schema, storage);
+ HoodieSchema readSchema = reconstruction == null ? schema :
reconstruction.intermediateSchema();
+ HoodieSchema repairedFileSchema =
HoodieSchemaRepair.repairLogicalTypes(getSchema(), readSchema);
Option<HoodieSchema> promotedSchema = Option.empty();
- if (!renamedColumns.isEmpty() ||
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema.toAvroSchema(),
schema.toAvroSchema())) {
+ if (!renamedColumns.isEmpty() ||
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema.toAvroSchema(),
readSchema.toAvroSchema())) {
AvroReadSupport.setAvroReadSchema(hadoopConf,
repairedFileSchema.toAvroSchema());
AvroReadSupport.setRequestedProjection(hadoopConf,
repairedFileSchema.toAvroSchema());
- promotedSchema = Option.of(schema);
+ promotedSchema = Option.of(readSchema);
} else {
- AvroReadSupport.setAvroReadSchema(hadoopConf, schema.toAvroSchema());
- AvroReadSupport.setRequestedProjection(hadoopConf,
schema.toAvroSchema());
+ AvroReadSupport.setAvroReadSchema(hadoopConf, readSchema.toAvroSchema());
+ AvroReadSupport.setRequestedProjection(hadoopConf,
readSchema.toAvroSchema());
}
ParquetReader<IndexedRecord> reader =
new HoodieAvroParquetReaderBuilder<IndexedRecord>(path)
-
.withTableSchema(getAvroSchemaConverter(hadoopConf).convert(schema))
+
.withTableSchema(getAvroSchemaConverter(hadoopConf).convert(readSchema))
.withConf(hadoopConf)
.set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
.set(ParquetInputFormat.STRICT_TYPE_CHECKING,
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
@@ -206,7 +210,10 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get(),
renamedColumns)
: new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
- return parquetReaderIterator;
+ if (reconstruction == null) {
+ return parquetReaderIterator;
+ }
+ return new CloseableMappingIterator<>(parquetReaderIterator,
reconstruction::reconstruct);
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
new file mode 100644
index 000000000000..325c233d928c
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class HoodieVariantReconstruction {
+
+ private final HoodieSchema intermediateSchema;
+ private final Schema outputAvroSchema;
+ private final VariantShreddingProvider provider;
+ // Indexed by field position in the (requested == output) record. For target
fields, the file's
+ // shredded sub-schema and the unshredded target sub-schema for rebuild;
null for non-targets.
+ private final boolean[] isTarget;
+ private final Schema[] shreddedSubSchemas;
+ private final Schema[] unshreddedSubSchemas;
+
+ private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema
outputAvroSchema,
+ VariantShreddingProvider provider, boolean[]
isTarget,
+ Schema[] shreddedSubSchemas, Schema[]
unshreddedSubSchemas) {
+ this.intermediateSchema = intermediateSchema;
+ this.outputAvroSchema = outputAvroSchema;
+ this.provider = provider;
+ this.isTarget = isTarget;
+ this.shreddedSubSchemas = shreddedSubSchemas;
+ this.unshreddedSubSchemas = unshreddedSubSchemas;
+ }
+
+ /**
+ * Schema to read the parquet file with: the requested schema, but with
shredded variant columns
+ * swapped to their file (typed_value-bearing) form so parquet-avro
materializes {@code typed_value}.
+ */
+ HoodieSchema intermediateSchema() {
+ return intermediateSchema;
+ }
+
+ /**
+ * Builds a reconstruction for the given file and requested schemas, or
returns {@code null} when
+ * none is needed (the file has no shredded variant columns). Throws when
the file has shredded
+ * variant columns to reconstruct but reading shredded variants is disabled,
or no provider is
+ * available: either way, reading at the unshredded schema would silently
drop the typed_value payload.
+ */
+ static HoodieVariantReconstruction create(HoodieSchema fileSchema,
HoodieSchema requestedSchema, HoodieStorage storage) {
+ if (requestedSchema.getType() != HoodieSchemaType.RECORD ||
fileSchema.getType() != HoodieSchemaType.RECORD) {
+ return null;
+ }
+
+ List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+ List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+ boolean[] isTarget = new boolean[requestedFields.size()];
+ boolean anyTarget = false;
+ for (int i = 0; i < requestedFields.size(); i++) {
+ HoodieSchemaField requestedField = requestedFields.get(i);
+ Option<HoodieSchemaField> fileField =
fileSchema.getField(requestedField.name());
+ if (fileField.isPresent() &&
isShreddedVariant(fileField.get().schema())) {
+ isTarget[i] = true;
+ anyTarget = true;
+ // Read this column in its on-disk shredded shape.
+
intermediateFields.add(requestedField.withSchema(fileField.get().schema()));
+ } else {
+ // Copy non-target fields too (withSchema makes a fresh Avro Field):
reusing the requested
+ // field's Avro Field, already bound to the requested record, would
fail Schema.setFields with
+ // "Field already used" when building the intermediate record below.
+
intermediateFields.add(requestedField.withSchema(requestedField.schema()));
+ }
+ }
+ if (!anyTarget) {
+ // No shredded variant columns in the file: nothing to reconstruct,
regardless of the flag.
+ return null;
+ }
+
+ if
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+ // Reading at the unshredded schema would drop typed_value and silently
corrupt variants whose
+ // payload lives there, so fail fast. Mirrors the no-provider branch and
Spark's
+ // allowReadingShredded=false, which rejects shredded reads rather than
discarding data.
+ throw new HoodieException("Base file has shredded variant column(s) but
reading shredded variants is "
+ + "disabled (" +
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key()
+ + "=false). Enable it to reconstruct them; otherwise the typed_value
payload would be silently dropped.");
+ }
+
+ VariantShreddingProvider provider = loadProvider(storage);
+ if (provider == null) {
+ // Reading would drop typed_value and silently corrupt variants whose
payload lives there, so fail fast.
+ throw new HoodieException("Base file has shredded variant column(s) and
reading shredded variants is "
+ + "enabled, but no VariantShreddingProvider is available to
reconstruct them. Set "
+ + HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()
+ + " or add a provider implementation (e.g. the Spark variant module)
to the classpath.");
+ }
+
+ HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+ requestedSchema.getAvroSchema().getName(),
+ requestedSchema.getAvroSchema().getNamespace(),
+ requestedSchema.getAvroSchema().getDoc(),
+ intermediateFields);
+ // Records leave this reader unshredded; output field order matches the
requested/intermediate order.
+ HoodieSchema outputSchema =
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+ Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+ Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+ for (int i = 0; i < requestedFields.size(); i++) {
+ if (isTarget[i]) {
+ shreddedSubSchemas[i] =
unwrapNullable(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();
+ unshreddedSubSchemas[i] =
unwrapNullable(outputSchema.getFields().get(i).schema()).getAvroSchema();
+ }
+ }
+
+ return new HoodieVariantReconstruction(intermediateSchema,
outputSchema.toAvroSchema(), provider,
+ isTarget, shreddedSubSchemas, unshreddedSubSchemas);
+ }
+
+ /**
+ * Rebuilds shredded variant columns of {@code in} (read in the intermediate
shredded shape) into
+ * a record conforming to the unshredded output schema.
+ */
+ IndexedRecord reconstruct(IndexedRecord in) {
+ GenericRecord out = new GenericData.Record(outputAvroSchema);
+ for (int i = 0; i < isTarget.length; i++) {
+ Object value = in.get(i);
+ if (isTarget[i] && value instanceof GenericRecord) {
+ out.put(i, provider.rebuildVariantRecord((GenericRecord) value,
shreddedSubSchemas[i], unshreddedSubSchemas[i]));
+ } else {
+ // Non-variant column, or a null variant column: pass through
unchanged.
+ out.put(i, value);
+ }
+ }
+ return out;
+ }
+
+ private static boolean isShreddedVariant(HoodieSchema schema) {
+ HoodieSchema unwrapped = unwrapNullable(schema);
+ return unwrapped.getType() == HoodieSchemaType.VARIANT
+ && ((HoodieSchema.Variant) unwrapped).isShredded();
+ }
+
+ private static HoodieSchema unwrapNullable(HoodieSchema schema) {
+ return schema.isNullable() ? schema.getNonNullType() : schema;
+ }
+
+ private static VariantShreddingProvider loadProvider(HoodieStorage storage) {
+ String providerClass = storage.getConf()
+
.getString(HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()).orElse(null);
+ if (providerClass == null || providerClass.isEmpty()) {
+ providerClass =
VariantShreddingProvider.detectProviderClassOnClasspath();
+ }
+ return providerClass == null ? null : (VariantShreddingProvider)
ReflectionUtils.loadClass(providerClass);
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index 9cbee1b44cbc..e4e86abef939 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -24,22 +24,15 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieParquetConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaField;
-import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import lombok.Getter;
import lombok.Setter;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -48,17 +41,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.Map;
import java.util.Properties;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieBaseParquetWriter {
@@ -130,43 +118,4 @@ public class TestHoodieBaseParquetWriter {
"The writer stops write new records while the file doesn't reach the
max file size limit");
}
}
-
- @Test
- public void testRejectAlreadyShreddedVariantInput() {
- // A record read from an already-shredded base file
(compaction/clustering) arrives with a
- // populated typed_value. The Avro write support cannot reconstruct the
unshredded variant yet,
- // so it must fail fast rather than silently drop the payload. Guard
tracked in
- // https://github.com/apache/hudi/issues/18931.
- HoodieSchema recordSchema = HoodieSchema.createRecord("guardTest", null,
null, Arrays.asList(
- HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
- HoodieSchemaField.of("v", HoodieSchema.createVariant())));
-
- // Shredding disabled so no VariantShreddingProvider is required; the
variant field is still tracked.
- Properties props = new Properties();
-
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
"false");
-
- HoodieSchema effectiveSchema =
HoodieAvroWriteSupport.generateEffectiveSchema(recordSchema, props);
- HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
- new AvroSchemaConverter().convert(effectiveSchema.toAvroSchema()),
recordSchema, Option.empty(), props);
-
- // Input whose variant column is already shredded (carries typed_value).
- Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
- shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT));
- Schema shreddedVariantAvro =
- HoodieSchema.createVariantShreddedObject(null, null, null,
shreddedFields).getAvroSchema();
-
- Schema inputAvro = Schema.createRecord("guardInput", null, null, false);
- inputAvro.setFields(Arrays.asList(
- new Schema.Field("id", Schema.create(Schema.Type.INT), null, (Object)
null),
- new Schema.Field("v", shreddedVariantAvro, null, (Object) null)));
- GenericRecord variantValue = new GenericData.Record(shreddedVariantAvro);
- variantValue.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(new byte[] {1}));
- GenericRecord input = new GenericData.Record(inputAvro);
- input.put("id", 1);
- input.put("v", variantValue);
-
- HoodieException ex = assertThrows(HoodieException.class, () ->
writeSupport.write(input));
- assertTrue(ex.getMessage().contains("already-shredded") &&
ex.getMessage().contains("18931"),
- "Expected the read-then-reshred guard message, got: " +
ex.getMessage());
- }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
new file mode 100644
index 000000000000..872a98a16460
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstruction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieVariantReconstruction {
+
+ private static HoodieSchema recordWithVariant(HoodieSchema variantSchema) {
+ return HoodieSchema.createRecord("test_record", "org.apache.hudi.test",
null,
+ Collections.singletonList(HoodieSchemaField.of("v", variantSchema)));
+ }
+
+ private static HoodieStorage storageWithReadingShredded(Path tmp, boolean
enabled) {
+ HoodieStorage storage = HoodieTestUtils.getStorage(tmp.toString());
+
storage.getConf().set(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+ Boolean.toString(enabled));
+ return storage;
+ }
+
+ @Test
+ void failsFastWhenShreddedColumnPresentButReadingDisabled(@TempDir Path tmp)
{
+ HoodieSchema fileSchema = recordWithVariant(
+
HoodieSchema.createVariantShredded(HoodieSchema.create(HoodieSchemaType.INT)));
+ HoodieSchema requestedSchema =
recordWithVariant(HoodieSchema.createVariant());
+
+ HoodieException ex = assertThrows(HoodieException.class, () ->
+ HoodieVariantReconstruction.create(fileSchema, requestedSchema,
+ storageWithReadingShredded(tmp, false)));
+ // The flag check must run after shredded-column detection: dropping
typed_value silently would
+ // corrupt rows whose payload lives there, so a disabled read of a
shredded file fails fast.
+ assertTrue(ex.getMessage().contains("reading shredded variants is
disabled"), ex.getMessage());
+ }
+
+ @Test
+ void returnsNullWhenNoShreddedColumnEvenIfReadingDisabled(@TempDir Path tmp)
{
+ HoodieSchema schema = recordWithVariant(HoodieSchema.createVariant());
+ // No shredded variant column to reconstruct: nothing to do regardless of
the flag.
+ assertNull(HoodieVariantReconstruction.create(schema, schema,
+ storageWithReadingShredded(tmp, false)));
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 3108b96fdd18..ebf835008412 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -125,11 +125,23 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
}
test("Test Query Log Only MOR Table With VARIANT column triggers
compaction") {
- assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")
+ // Gated on Spark >= 4.1. Compaction writes the base file via the AVRO
shredding writer, which
+ // lays the variant group out as [metadata, value, typed_value]. Spark
4.0's read support
+ // (Spark40HoodieParquetReadSupport.reorderVariantFields) rebuilds that
group as [value, metadata]
+ // and drops typed_value, so the subsequent native read fails with
MALFORMED_VARIANT. Spark 4.1+
+ // reads variant fields by name (SPARK-54410) and reconstructs correctly.
+ // TODO(voon): drop this comment once Spark 4.0 is removed.
+ assume(HoodieSparkUtils.gteqSpark4_1, "Shredded variant base-file read
requires Spark 4.1 or higher")
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
val tablePath = tmp.getCanonicalPath
+ // Shred variants on write so the compacted base file is shredded, then
read it back. Note the
+ // Spark SQL read here goes through the InternalRow reader
(SparkFileFormatInternalRowReaderContext),
+ // which reconstructs the shredded variant natively - it does NOT
exercise the AVRO read-path
+ // reconstruction (#18931, HoodieVariantReconstruction), which Spark
compaction never reaches.
+ // That path is covered directly by TestSpark4VariantShreddingProvider
and
+ // TestHoodieVariantReconstruction.
spark.sql(
s"""
|create table $tableName (
@@ -142,6 +154,8 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'ts',
+ | hoodie.parquet.variant.write.shredding.enabled = 'true',
+ | hoodie.parquet.variant.force.shredding.schema.for.test = 'key
string',
| hoodie.index.type = 'INMEMORY',
| hoodie.compact.inline = 'true',
| hoodie.compact.inline.max.delta.commits = '5',
@@ -578,9 +592,9 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
|insert into $tableName values
| (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
""".stripMargin)
- // Reading shredded variants back needs Spark 4.1+
(spark.sql.variant.allowReadingShredded);
- // Spark 4.0's reader rejects the 3-field shredded layout (4.0 read
support is added later, see
- // https://github.com/apache/hudi/issues/18931). The shredded write is
still validated below.
+ // Reading shredded variants back via Spark SQL needs Spark 4.1+
(spark.sql.variant.allowReadingShredded,
+ // SPARK-54410); Spark 4.0's native reader rejects the 3-field shredded
layout. The shredded write is
+ // still validated below.
if (HoodieSparkUtils.gteqSpark4_1) {
checkAnswer(s"select id, name, cast(v as string), ts from $tableName
order by id")(
Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
index a0aa663f9463..5864a56d0c26 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
@@ -21,18 +21,23 @@ package org.apache.hudi.variant;
import org.apache.hudi.avro.VariantShreddingProvider;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.ShreddingUtils;
import org.apache.spark.types.variant.Variant;
import org.apache.spark.types.variant.VariantSchema;
import org.apache.spark.types.variant.VariantShreddingWriter;
import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
import
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.IdentityHashMap;
@@ -86,6 +91,35 @@ public class Spark4VariantShreddingProvider implements
VariantShreddingProvider
return result.toGenericRecord();
}
+ @Override
+ public GenericRecord rebuildVariantRecord(
+ GenericRecord shreddedVariant,
+ Schema shreddedSchema,
+ Schema unshreddedSchema) {
+
+ if (shreddedVariant == null) {
+ return null;
+ }
+ ByteBuffer metadataBuf = (ByteBuffer)
shreddedVariant.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+ if (metadataBuf == null) {
+ // metadata is REQUIRED in the shredded schema, so a null here means the
base file is malformed;
+ // fail fast rather than silently nulling out the variant column.
+ throw new HoodieException("Cannot reconstruct variant: shredded record
is missing the required '"
+ + HoodieSchema.Variant.VARIANT_METADATA_FIELD + "' field; the base
file is malformed.");
+ }
+
+ // Reuse the same VariantSchema index assignment as the write path (no
builder needed on read).
+ VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, null);
+
+ // Delegate to Spark's reconstruction algorithm (inverse of castShredded).
+ Variant variant = ShreddingUtils.rebuild(new
AvroVariantRow(shreddedVariant, sparkSchema), sparkSchema);
+
+ GenericRecord out = new GenericData.Record(unshreddedSchema);
+ out.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(variant.getMetadata()));
+ out.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
ByteBuffer.wrap(variant.getValue()));
+ return out;
+ }
+
/**
* Builds a {@link VariantSchema} from an Avro {@link Schema} representing a
* shredded variant structure ({@code value}, {@code metadata}, {@code
typed_value}).
@@ -150,7 +184,11 @@ public class Spark4VariantShreddingProvider implements
VariantShreddingProvider
typedIdx, variantIdx, topLevelMetadataIdx, numFields,
scalarSchema, objectSchema, arraySchema);
- builder.registerSchema(result, avroSchema);
+ // The read (rebuild) path passes a null builder: it needs the
VariantSchema indices but no
+ // Avro-schema registration (registration only feeds write-side result
construction).
+ if (builder != null) {
+ builder.registerSchema(result, avroSchema);
+ }
return result;
}
@@ -178,11 +216,12 @@ public class Spark4VariantShreddingProvider implements
VariantShreddingProvider
if ("local-timestamp-micros".equals(name)) {
return new VariantSchema.TimestampNTZType();
}
- if ("timestamp-millis".equals(name)) {
- return new VariantSchema.TimestampType();
- }
- if ("local-timestamp-millis".equals(name)) {
- return new VariantSchema.TimestampNTZType();
+ // The Variant binary spec stores timestamps in microseconds, so a
millisecond-precision
+ // typed_value cannot represent a variant timestamp. Decline to shred it
as a scalar (the value
+ // stays in the residual unshredded binary) rather than mapping it to
the micros TimestampType,
+ // which would silently scale the value by 1000x.
+ if ("timestamp-millis".equals(name) ||
"local-timestamp-millis".equals(name)) {
+ return null;
}
if ("uuid".equals(name)) {
return new VariantSchema.UuidType();
@@ -398,4 +437,240 @@ public class Spark4VariantShreddingProvider implements
VariantShreddingProvider
return true;
}
}
+
+ /**
+ * Base {@link ShreddingUtils.ShreddedRow} with all accessors throwing;
concrete rows override
+ * only the accessors valid for their nesting context. This is the read-path
mirror of the
+ * write-path {@link AvroShreddedResult}: it reads Avro records to feed
Spark's reconstruction
+ * ({@link ShreddingUtils#rebuild}).
+ */
+ abstract static class BaseAvroShreddedRow implements
ShreddingUtils.ShreddedRow {
+ @Override
+ public boolean isNullAt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+ throw unsupported();
+ }
+
+ @Override
+ public String getString(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public UUID getUuid(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getStruct(int ordinal, int numFields) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int numElements() {
+ throw unsupported();
+ }
+
+ private static UnsupportedOperationException unsupported() {
+ return new UnsupportedOperationException("Accessor not valid for this
shredded row context");
+ }
+ }
+
+ /**
+ * A shredded variant struct {@code {value, [metadata], typed_value}}. Maps
the Spark
+ * {@link VariantSchema} ordinals (variantIdx / topLevelMetadataIdx /
typedIdx) back to the named
+ * Avro fields, and reads {@code typed_value} for scalar/object/array
reconstruction.
+ */
+ static final class AvroVariantRow extends BaseAvroShreddedRow {
+ private final GenericRecord record;
+ private final VariantSchema schema;
+
+ AvroVariantRow(GenericRecord record, VariantSchema schema) {
+ this.record = record;
+ this.schema = schema;
+ }
+
+ private String fieldNameFor(int ordinal) {
+ if (ordinal == schema.typedIdx) {
+ return HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD;
+ }
+ if (ordinal == schema.variantIdx) {
+ return HoodieSchema.Variant.VARIANT_VALUE_FIELD;
+ }
+ if (ordinal == schema.topLevelMetadataIdx) {
+ return HoodieSchema.Variant.VARIANT_METADATA_FIELD;
+ }
+ throw new IllegalArgumentException("Unexpected shredded ordinal: " +
ordinal);
+ }
+
+ @Override public boolean isNullAt(int ordinal) {
+ return record.get(fieldNameFor(ordinal)) == null;
+ }
+
+ @Override public byte[] getBinary(int ordinal) {
+ return toByteArray((ByteBuffer) record.get(fieldNameFor(ordinal)));
+ }
+
+ // The scalar getters below read typed_value directly: Spark only invokes
them for the scalar
+ // typed_value (ordinal == typedIdx), so resolving via
fieldNameFor(ordinal) would be redundant.
+ // isNullAt/getBinary stay on fieldNameFor because they are also called
for value/metadata.
+ @Override public boolean getBoolean(int ordinal) {
+ return (Boolean)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+ }
+
+ @Override public byte getByte(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).byteValue();
+ }
+
+ @Override public short getShort(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).shortValue();
+ }
+
+ @Override public int getInt(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).intValue();
+ }
+
+ @Override public long getLong(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).longValue();
+ }
+
+ @Override public float getFloat(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).floatValue();
+ }
+
+ @Override public double getDouble(int ordinal) {
+ return ((Number)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)).doubleValue();
+ }
+
+ @Override public String getString(int ordinal) {
+ return
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).toString();
+ }
+
+ @Override public UUID getUuid(int ordinal) {
+ return
UUID.fromString(record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).toString());
+ }
+
+ @Override public BigDecimal getDecimal(int ordinal, int precision, int
scale) {
+ Object value =
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+ if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ }
+ Schema tvSchema =
unwrapNullable(record.getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD).schema());
+ Conversions.DecimalConversion conversion = new
Conversions.DecimalConversion();
+ if (value instanceof ByteBuffer) {
+ return conversion.fromBytes((ByteBuffer) value, tvSchema,
tvSchema.getLogicalType());
+ }
+ if (value instanceof GenericFixed) {
+ return conversion.fromFixed((GenericFixed) value, tvSchema,
tvSchema.getLogicalType());
+ }
+ throw new IllegalStateException("Unexpected decimal representation: " +
value);
+ }
+
+ @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int
numFields) {
+ // Object shredding: typed_value is a record whose fields are the
shredded object fields.
+ return new AvroObjectRow((GenericRecord)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD), schema);
+ }
+
+ @Override public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+ return new AvroArrayRow((List<?>)
record.get(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD), schema.arraySchema);
+ }
+ }
+
+ /**
+ * The {@code typed_value} record of an object-shredded variant: ordinal
{@code i} addresses the
+ * i-th shredded object field (a nested {@code {value, typed_value}} struct).
+ */
+ static final class AvroObjectRow extends BaseAvroShreddedRow {
+ private final GenericRecord typedValueRecord;
+ private final VariantSchema parentSchema;
+
+ AvroObjectRow(GenericRecord typedValueRecord, VariantSchema parentSchema) {
+ this.typedValueRecord = typedValueRecord;
+ this.parentSchema = parentSchema;
+ }
+
+ @Override public boolean isNullAt(int ordinal) {
+ return
typedValueRecord.get(parentSchema.objectSchema[ordinal].fieldName) == null;
+ }
+
+ @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int
numFields) {
+ VariantSchema.ObjectField field = parentSchema.objectSchema[ordinal];
+ return new AvroVariantRow((GenericRecord)
typedValueRecord.get(field.fieldName), field.schema);
+ }
+ }
+
+ /**
+ * The {@code typed_value} array of an array-shredded variant: each element
is a shredded variant
+ * struct following {@code elementSchema}.
+ */
+ static final class AvroArrayRow extends BaseAvroShreddedRow {
+ private final List<?> elements;
+ private final VariantSchema elementSchema;
+
+ AvroArrayRow(List<?> elements, VariantSchema elementSchema) {
+ this.elements = elements;
+ this.elementSchema = elementSchema;
+ }
+
+ @Override public int numElements() {
+ return elements.size();
+ }
+
+ @Override public boolean isNullAt(int ordinal) {
+ return elements.get(ordinal) == null;
+ }
+
+ @Override public ShreddingUtils.ShreddedRow getStruct(int ordinal, int
numFields) {
+ return new AvroVariantRow((GenericRecord) elements.get(ordinal),
elementSchema);
+ }
+ }
}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
new file mode 100644
index 000000000000..0f851e98fa6a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/io/storage/hadoop/TestHoodieVariantReconstructionRoundTrip.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.variant.Spark4VariantShreddingProvider;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Round-trip coverage for the successful {@code create} -> {@code
reconstruct} path of
+ * {@link HoodieVariantReconstruction}: the AVRO read-path orchestration
(isTarget alignment,
+ * shredded/unshredded sub-schema indexing, and per-record field mapping).
+ *
+ * <p>Runs in hudi-spark4-common so the real {@link
Spark4VariantShreddingProvider} is auto-detected
+ * on the classpath; the package is {@code org.apache.hudi.io.storage.hadoop}
to reach the
+ * package-private class. Spark compaction never reaches this path (it reads
base files via the
+ * InternalRow reader), so this is the only place the alignment is exercised
end to end.
+ * The null/throw guards of {@code create} are covered by {@code
TestHoodieVariantReconstruction}
+ * (hudi-hadoop-common); the provider's shred/rebuild in isolation by {@code
TestSpark4VariantShreddingProvider}.
+ */
+class TestHoodieVariantReconstructionRoundTrip {
+
+ @Test
+ void createThenReconstructRebuildsVariantAndPassesThroughNonVariant(@TempDir
Path tmp) throws Exception {
+ // A shredded variant column "v" alongside a non-variant column "id", to
exercise field alignment.
+ Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+ shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.STRING));
+ shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.LONG));
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(shreddedFields);
+ HoodieSchema.Variant unshreddedVariant = HoodieSchema.createVariant();
+
+ HoodieSchema fileSchema = HoodieSchema.createRecord("r",
"org.apache.hudi.test", null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("v", shreddedVariant)));
+ HoodieSchema requestedSchema = HoodieSchema.createRecord("r",
"org.apache.hudi.test", null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of("v", unshreddedVariant)));
+
+ HoodieStorage storage = HoodieTestUtils.getStorage(tmp.toString()); //
allow.reading.shredded defaults true
+ HoodieVariantReconstruction reconstruction =
+ HoodieVariantReconstruction.create(fileSchema, requestedSchema,
storage);
+ assertNotNull(reconstruction, "create should build a reconstruction for a
shredded variant column");
+
+ // Build the input the parquet-avro reader would produce: a record at the
intermediate (shredded)
+ // schema, i.e. {id, v=<shredded {metadata, value, typed_value}>}.
+ Spark4VariantShreddingProvider provider = new
Spark4VariantShreddingProvider();
+ Variant original = VariantBuilder.parseJson("{\"a\":\"x\",\"b\":5}",
false);
+ GenericRecord unshreddedV = new
GenericData.Record(unshreddedVariant.getAvroSchema());
+ unshreddedV.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(original.getMetadata()));
+ unshreddedV.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
ByteBuffer.wrap(original.getValue()));
+ GenericRecord shreddedV =
+ provider.shredVariantRecord(unshreddedV,
shreddedVariant.getAvroSchema(), shreddedVariant);
+
+ GenericRecord input = new
GenericData.Record(reconstruction.intermediateSchema().getAvroSchema());
+ input.put("id", 7L);
+ input.put("v", shreddedV);
+
+ IndexedRecord out = reconstruction.reconstruct(input);
+
+ // Non-variant column passes through unchanged at its position; the
variant column is rebuilt unshredded.
+ assertEquals(7L, out.get(0));
+ GenericRecord rebuiltV = (GenericRecord) out.get(1);
+ Variant rebuilt = new Variant(
+ toBytes(rebuiltV.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD)),
+ toBytes(rebuiltV.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD)));
+ assertEquals(original.toJson(ZoneOffset.UTC),
rebuilt.toJson(ZoneOffset.UTC));
+ }
+
+ private static byte[] toBytes(Object byteBuffer) {
+ ByteBuffer buf = ((ByteBuffer) byteBuffer).duplicate();
+ byte[] out = new byte[buf.remaining()];
+ buf.get(out);
+ return out;
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
new file mode 100644
index 000000000000..37800e98dc02
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4-common/src/test/java/org/apache/hudi/variant/TestSpark4VariantShreddingProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Round-trip coverage for {@link Spark4VariantShreddingProvider}: shred an
unshredded variant, then
+ * reconstruct it, and assert it round-trips. This exercises {@code
rebuildVariantRecord} and the
+ * {@code AvroVariantRow}/{@code AvroObjectRow}/{@code AvroArrayRow} accessors
across scalar, object,
+ * and array shapes - the AVRO read-path reconstruction (#18931) that the
Spark MOR SQL test cannot
+ * reach (Spark compaction reads base files via the InternalRow reader, not
HoodieAvroParquetReader).
+ */
+class TestSpark4VariantShreddingProvider {
+
+ private final Spark4VariantShreddingProvider provider = new
Spark4VariantShreddingProvider();
+ private final Schema unshreddedSchema =
HoodieSchema.createVariant().getAvroSchema();
+
+ /** Parse json to a variant, shred it to {@code shredded}, rebuild it,
assert the json round-trips. */
+ private void assertRoundTrips(String json, HoodieSchema.Variant shredded)
throws Exception {
+ Variant variant = VariantBuilder.parseJson(json, false);
+ GenericRecord unshreddedRecord = new GenericData.Record(unshreddedSchema);
+ unshreddedRecord.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(variant.getMetadata()));
+ unshreddedRecord.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
ByteBuffer.wrap(variant.getValue()));
+
+ Schema shreddedSchema = shredded.getAvroSchema();
+ GenericRecord shreddedRecord =
provider.shredVariantRecord(unshreddedRecord, shreddedSchema, shredded);
+ GenericRecord rebuilt = provider.rebuildVariantRecord(shreddedRecord,
shreddedSchema, unshreddedSchema);
+
+ Variant rebuiltVariant = new Variant(
+ toBytes(rebuilt.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD)),
+ toBytes(rebuilt.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD)));
+ assertEquals(variant.toJson(ZoneOffset.UTC),
rebuiltVariant.toJson(ZoneOffset.UTC),
+ "variant did not round-trip through shred/rebuild for: " + json);
+ }
+
+ private void assertScalarRoundTrips(String json, HoodieSchema typedValue)
throws Exception {
+ assertRoundTrips(json, HoodieSchema.createVariantShredded(typedValue));
+ }
+
+ @Test
+ void numericRoundTrips() throws Exception {
+ assertScalarRoundTrips("42", HoodieSchema.create(HoodieSchemaType.LONG));
+ }
+
+ @Test
+ void stringRoundTrips() throws Exception {
+ assertScalarRoundTrips("\"hello world\"",
HoodieSchema.create(HoodieSchemaType.STRING));
+ }
+
+ @Test
+ void booleanRoundTrips() throws Exception {
+ assertScalarRoundTrips("true",
HoodieSchema.create(HoodieSchemaType.BOOLEAN));
+ }
+
+ @Test
+ void decimalRoundTrips() throws Exception {
+ assertScalarRoundTrips("123.45", HoodieSchema.createDecimal(10, 2));
+ }
+
+ @Test
+ void objectRoundTrips() throws Exception {
+ Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+ shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.STRING));
+ shreddedFields.put("b", HoodieSchema.create(HoodieSchemaType.LONG));
+ assertRoundTrips("{\"a\":\"x\",\"b\":5}",
HoodieSchema.createVariantShreddedObject(shreddedFields));
+ }
+
+ @Test
+ void arrayRoundTrips() throws Exception {
+ // typed_value for an array is array<{value, typed_value}>: each element
is itself a shredded struct.
+ HoodieSchema element = HoodieSchema.createRecord("v_array_element",
"org.apache.hudi.test", null, Arrays.asList(
+ HoodieSchemaField.of(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
HoodieSchema.createNullable(HoodieSchemaType.BYTES)),
+ HoodieSchemaField.of(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD,
HoodieSchema.create(HoodieSchemaType.LONG))));
+ assertScalarRoundTrips("[1,2,3]", HoodieSchema.createArray(element));
+ }
+
+ private static byte[] toBytes(Object byteBuffer) {
+ ByteBuffer buf = ((ByteBuffer) byteBuffer).duplicate();
+ byte[] out = new byte[buf.remaining()];
+ buf.get(out);
+ return out;
+ }
+}