This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 23fe7bfcd7e6 feat(variant): Add support to write shredded variants for
HoodieRecordType.AVRO (#18065)
23fe7bfcd7e6 is described below
commit 23fe7bfcd7e6cbbc72a05472384f942a88a812b8
Author: voonhous <[email protected]>
AuthorDate: Thu Jun 18 15:07:09 2026 +0800
feat(variant): Add support to write shredded variants for
HoodieRecordType.AVRO (#18065)
Adds write support for shredded Variant on the AVRO record path. Variants
are
decomposed into a typed_value column for better compression and column
pruning.
Shredding is delegated to a VariantShreddingProvider (in hudi-common, loaded
reflectively), with a Spark 4 implementation built on Spark's
VariantShreddingWriter.
- Config: new advanced hoodie.parquet.variant.shredding.provider.class
(sinceVersion 1.3.0), auto-detected from the classpath when unset. Gated
by
the existing write.shredding.enabled; reads governed by
allow.reading.shredded.
- Write path: HoodieAvroFileWriterFactory builds the Parquet schema from the
effective (shredded) schema so value is nullable and typed_value is
present;
injects the provider on a copied Properties so the shared config is never
mutated. Fails fast when shredding is required but no provider is
available,
and on the not-yet-supported read-then-reshred path (#18931).
- Spark read: BaseSpark4Adapter recognizes both unshredded (2-field) and
shredded (3-field) physical layouts as Variant. Spark 4.0.x keeps a
reorder
workaround for its hardcoded [value, metadata] converter; 4.1+ reads by
name
(SPARK-54410), removable later (#18935). No session SQLConf mutation -
Spark's
converter reads the flag via SQLConf.get.
- DDL parsing: shared StringUtils.splitTopLevelCommas (paren-aware) keeps
decimal(15, 1) intact, reused by both the shredding DDL parser and the
vector
column metadata parser.
- Tests: Spark integration (TestVariantDataType) plus unit coverage for the
forced-shredding DDL (incl. decimal), shredded schema shape, the
read-then-reshred guard, and splitTopLevelCommas.
Limitations: shredded writes require Spark 4.0+; reading shredded data back
requires Spark 4.1+ (Spark 4.0 read deferred, #18931).
Closes #18066
Closes #17748
---
.../apache/hudi/avro/VariantShreddingProvider.java | 66 ++++
.../hudi/common/config/HoodieStorageConfig.java | 22 +-
.../apache/hudi/common/schema/HoodieSchema.java | 30 +-
.../apache/hudi/avro/HoodieAvroWriteSupport.java | 424 ++++++++++++++++++++-
.../hadoop/HoodieAvroFileWriterFactory.java | 37 +-
.../avro/TestHoodieAvroWriteSupportShredding.java | 71 ++++
.../io/hadoop/TestHoodieBaseParquetWriter.java | 51 +++
.../org/apache/hudi/common/util/StringUtils.java | 36 ++
.../apache/hudi/common/util/TestStringUtils.java | 15 +
.../sql/hudi/dml/schema/TestVariantDataType.scala | 158 ++++++++
.../variant/Spark4VariantShreddingProvider.java | 401 +++++++++++++++++++
.../spark/sql/adapter/BaseSpark4Adapter.scala | 23 +-
.../parquet/Spark40HoodieParquetReadSupport.scala | 2 +-
13 files changed, 1290 insertions(+), 46 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
new file mode 100644
index 000000000000..90ffbee1e53f
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface for shredding variant values at write time.
+ * <p>
+ * Implementations parse variant binary data (value + metadata bytes) and
produce
+ * a shredded {@link GenericRecord} with typed_value columns populated
according
+ * to the shredding schema.
+ * <p>
+ * This interface allows the variant binary parsing logic (which may depend on
+ * engine-specific libraries like Spark's variant module) to be loaded via
reflection,
+ * keeping the core write support free of engine-specific dependencies.
+ */
+public interface VariantShreddingProvider {
+
+ /**
+ * Transform an unshredded variant GenericRecord into a shredded one.
+ * <p>
+ * The input record is expected to have:
+ * <ul>
+ * <li>{@code value}: ByteBuffer containing the variant value binary</li>
+ * <li>{@code metadata}: ByteBuffer containing the variant metadata
binary</li>
+ * </ul>
+ * <p>
+ * The output record should conform to {@code shreddedSchema} and have:
+ * <ul>
+ * <li>{@code value}: ByteBuffer or null (null when typed_value captures
the full value)</li>
+ * <li>{@code metadata}: ByteBuffer (always present)</li>
+ * <li>{@code typed_value}: the typed representation extracted from the
variant binary,
+ * or null if the variant type does not match the typed_value
schema</li>
+ * </ul>
+ *
+ * @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata:
ByteBuffer}
+ * @param shreddedSchema target Avro schema with {value: nullable
ByteBuffer, metadata: ByteBuffer, typed_value: type}
+ * @param variantSchema HoodieSchema.Variant containing the shredding
schema information
+ * @return a GenericRecord conforming to shreddedSchema with typed_value
populated where possible
+ */
+ GenericRecord shredVariantRecord(
+ GenericRecord unshreddedVariant,
+ Schema shreddedSchema,
+ HoodieSchema.Variant variantSchema);
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index cbdf5cc223e0..4c3655e6cc64 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -235,11 +235,12 @@ public class HoodieStorageConfig extends HoodieConfig {
.noDefaultValue()
.markAdvanced()
.sinceVersion("1.1.0")
- .withDocumentation("Forces a specific shredding schema for all variant
columns, intended for testing. "
+ .withDocumentation("Test-only: forces a specific shredding schema for
all variant columns. Not intended "
+ + "for production use; it exists solely to exercise the shredding
write path in tests, mirroring "
+ + "Spark's internal spark.sql.variant.forceShreddingSchemaForTest. "
+ "The value should be a DDL-format schema string (e.g., 'a int, b
string, c decimal(15, 1)'). "
+ "When set and write shredding is enabled, this schema overrides
the schema-driven shredding "
- + "configuration for all variant columns. "
- + "Equivalent to Spark's
spark.sql.variant.forceShreddingSchemaForTest.");
+ + "configuration for all variant columns.");
public static final ConfigProperty<Boolean>
PARQUET_VARIANT_ALLOW_READING_SHREDDED = ConfigProperty
.key("hoodie.parquet.variant.allow.reading.shredded")
@@ -250,6 +251,16 @@ public class HoodieStorageConfig extends HoodieConfig {
+ "When disabled, only unshredded variant data can be read. "
+ "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
+ public static final ConfigProperty<String>
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
+ .key("hoodie.parquet.variant.shredding.provider.class")
+ .noDefaultValue()
+ .markAdvanced()
+ .sinceVersion("1.3.0")
+ .withDocumentation("Fully-qualified class name of the
VariantShreddingProvider implementation "
+ + "used to shred variant values at write time in the Avro record
path. "
+ + "The provider parses variant binary data and populates typed_value
columns. "
+ + "When not set, the provider is auto-detected from the classpath.");
+
public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE =
ConfigProperty
.key("hoodie.parquet.write.utc-timezone.enabled")
.defaultValue(true)
@@ -596,11 +607,6 @@ public class HoodieStorageConfig extends HoodieConfig {
return this;
}
- public Builder parquetVariantForceShreddingSchemaForTest(String
schemaString) {
- storageConfig.setValue(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST,
schemaString);
- return this;
- }
-
public Builder parquetVariantAllowReadingShredded(boolean allowed) {
storageConfig.setValue(PARQUET_VARIANT_ALLOW_READING_SHREDDED,
String.valueOf(allowed));
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 0b0978d552a2..81f467124f20 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.schema;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieAvroSchemaException;
@@ -282,35 +283,18 @@ public class HoodieSchema implements Serializable {
* @return set of field names (preserves insertion order), or empty set if
input is null / empty
*/
public static Set<String> parseVectorColumnNames(String footerValue) {
- if (footerValue == null || footerValue.isEmpty()) {
- return Collections.emptySet();
- }
LinkedHashSet<String> names = new LinkedHashSet<>();
- int depth = 0;
- int start = 0;
- for (int i = 0; i < footerValue.length(); i++) {
- char c = footerValue.charAt(i);
- if (c == '(') {
- depth++;
- } else if (c == ')') {
- depth--;
- } else if (c == ',' && depth == 0) {
- addVectorColumnName(footerValue, start, i, names);
- start = i + 1;
+ // Split on top-level commas only so the comma inside a VECTOR(dim,
elemType) descriptor is
+ // not mistaken for a column separator.
+ for (String entry : StringUtils.splitTopLevelCommas(footerValue)) {
+ int colon = entry.indexOf(':');
+ if (colon > 0) {
+ names.add(entry.substring(0, colon).trim());
}
}
- addVectorColumnName(footerValue, start, footerValue.length(), names);
return names;
}
- private static void addVectorColumnName(String s, int start, int end,
Set<String> names) {
- int colon = s.indexOf(':', start);
- if (colon > start && colon < end) {
- names.add(s.substring(start, colon).trim());
- }
- }
-
-
private Schema avroSchema;
private HoodieSchemaType type;
private transient List<HoodieSchemaField> fields;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index df7930060fe9..3cd3c965388b 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -20,22 +20,46 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
/**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider}
loaded via reflection.</p>
*/
public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
@@ -43,15 +67,236 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
private final Map<String, String> footerMetadata = new HashMap<>();
protected final Properties properties;
+ /**
+ * Whether variant write shredding is enabled via config.
+ */
+ private final boolean variantWriteShreddingEnabled;
+
+ /**
+ * The effective (possibly shredded) Avro schema used for writing.
+ */
+ private final Schema effectiveAvroSchema;
+
+ /**
+ * Variant fields that need shredding, keyed by their index in the effective
schema.
+ * Empty if no shredding is needed.
+ */
+ private final Map<Integer, ShreddedVariantField> shreddedVariantFields;
+
+ /**
+ * Provider for variant shredding (loaded via reflection). Null if no
shredding is needed.
+ */
+ private final VariantShreddingProvider shreddingProvider;
+
+ /**
+ * Names of all variant-typed top-level fields, regardless of shredding.
Used to fail fast on the
+ * not-yet-supported read-then-reshred path (compaction/clustering over an
already-shredded base
+ * file). See https://github.com/apache/hudi/issues/18931.
+ */
+ private final String[] variantFieldNames;
+
public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema,
Option<BloomFilter> bloomFilterOpt,
Properties properties) {
- super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+ this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema,
properties), bloomFilterOpt, properties);
+ }
+
+ private HoodieAvroWriteSupport(MessageType schema, HoodieSchema
hoodieSchema, HoodieSchema effectiveSchema,
+ Option<BloomFilter> bloomFilterOpt,
Properties properties) {
+ super(schema, effectiveSchema.toAvroSchema(),
ConvertingGenericData.INSTANCE);
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
this.properties = properties;
String vectorMeta =
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
if (!vectorMeta.isEmpty()) {
footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
}
+
+ this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+ this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ // Single pass over the effective schema fields: collect every
variant-typed field name (for the
+ // read-then-reshred guard) and, when shredding is enabled, the subset
that needs shredding.
+ Map<Integer, ShreddedVariantField> shreddedFields = new LinkedHashMap<>();
+ List<String> variantNames = new ArrayList<>();
+
+ if (effectiveSchema.getType() == HoodieSchemaType.RECORD) {
+ List<HoodieSchemaField> fields = effectiveSchema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField field = fields.get(i);
+ HoodieSchema fieldSchema = field.schema();
+ // Unwrap nullable union to get the underlying type
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullType();
+ }
+ if (fieldSchema.getType() != HoodieSchemaType.VARIANT) {
+ continue;
+ }
+ variantNames.add(field.name());
+ if (variantWriteShreddingEnabled) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+ if (variant.isShredded() &&
variant.getTypedValueField().isPresent()) {
+ // Get the Avro sub-schema for this variant field from the
effective schema
+ Schema fieldAvroSchema =
effectiveAvroSchema.getFields().get(i).schema();
+ // Unwrap nullable union
+ if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+ fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+ }
+ shreddedFields.put(i, new ShreddedVariantField(fieldAvroSchema,
variant));
+ }
+ }
+ }
+ }
+
+ this.shreddedVariantFields = shreddedFields;
+ this.variantFieldNames = variantNames.toArray(new String[0]);
+
+ // Load shredding provider via reflection if needed
+ if (!shreddedVariantFields.isEmpty()) {
+ String providerClass =
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+ if (providerClass == null || providerClass.isEmpty()) {
+ throw new HoodieException("Variant write shredding is enabled and the
write schema requires shredding "
+ + "(typed_value columns present), but no VariantShreddingProvider
is configured or available on the "
+ + "classpath. Set " +
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key() + " or add a provider "
+ + "implementation (e.g. the Spark variant module) to the
classpath.");
+ }
+ this.shreddingProvider = (VariantShreddingProvider)
ReflectionUtils.loadClass(providerClass);
+ } else {
+ this.shreddingProvider = null;
+ }
+ }
+
+ /**
+ * Generates the effective schema for writing, applying variant shredding
configuration.
+ *
+ * <p>When shredding is disabled, shredded variant fields are replaced with
unshredded
+ * variants (removing {@code typed_value}) so that the Parquet file does not
contain
+ * unused typed_value columns.</p>
+ *
+ * <p>When shredding is enabled and a forced shredding schema is configured
via
+ * {@link
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+ * all variant fields are replaced with shredded variants using the forced
schema.
+ * This handles the case where the input schema is unshredded but shredding
is desired.</p>
+ *
+ * <p>When shredding is enabled without a forced schema, the schema is
returned as-is
+ * (already-shredded variants stay shredded, unshredded variants stay
unshredded).</p>
+ *
+ * @param hoodieSchema the original HoodieSchema
+ * @param properties writer properties containing shredding configuration
+ * @return the effective schema to use for writing
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, Properties properties) {
+ boolean shreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ if (!shreddingEnabled) {
+ // Schemas from clustering/compaction may still be shredded (read from
on-disk Parquet files
+ // written with shredding enabled), so we need to strip typed_value when
shredding
+ // is disabled.
+ return stripVariantShredding(hoodieSchema);
+ }
+
+ // Check if a forced shredding schema is configured
+ String forceShreddingSchema = properties.getProperty(
+ PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+ if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+ return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+ }
+
+ // When enabled without forced schema, use the schema as-is
+ // (shredded variants stay shredded, unshredded variants stay unshredded)
+ return hoodieSchema;
+ }
+
+ /**
+ * Overloaded version accepting HoodieConfig for use by factories.
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, HoodieConfig config) {
+ return generateEffectiveSchema(hoodieSchema, config.getProps());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(T record) {
+ if (variantFieldNames.length > 0) {
+ assertInputNotAlreadyShredded((IndexedRecord) record);
+ }
+ if (!shreddedVariantFields.isEmpty() && shreddingProvider != null) {
+ super.write((T) shredRecord((IndexedRecord) record));
+ } else {
+ super.write(record);
+ }
+ }
+
+ /**
+ * Builds a shredded copy of {@code inputRecord}: variant fields configured
for shredding are
+ * transformed via the {@link VariantShreddingProvider} to populate {@code
typed_value}; all other
+ * fields are copied across as-is.
+ */
+ private GenericRecord shredRecord(IndexedRecord inputRecord) {
+ GenericRecord shreddedRecord = new GenericData.Record(effectiveAvroSchema);
+
+ // Copy all fields, transforming variant fields that need shredding
+ List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+ Schema inputSchema = inputRecord.getSchema();
+
+ for (int i = 0; i < effectiveFields.size(); i++) {
+ Schema.Field effectiveField = effectiveFields.get(i);
+ String fieldName = effectiveField.name();
+ Schema.Field inputField = inputSchema.getField(fieldName);
+ if (inputField == null) {
+ continue;
+ }
+
+ ShreddedVariantField shreddedField = shreddedVariantFields.get(i);
+ if (shreddedField != null) {
+ // This is a shredded variant field - transform it
+ Object fieldValue = inputRecord.get(inputField.pos());
+ if (fieldValue instanceof GenericRecord) {
+ GenericRecord variantRecord = (GenericRecord) fieldValue;
+ GenericRecord shreddedVariant = shreddingProvider.shredVariantRecord(
+ variantRecord,
+ shreddedField.avroSchema,
+ shreddedField.hoodieSchema);
+ shreddedRecord.put(i, shreddedVariant);
+ } else {
+ // Null or unexpected type - copy as-is
+ shreddedRecord.put(i, fieldValue);
+ }
+ } else {
+ // Non-variant field - copy as-is
+ shreddedRecord.put(i, inputRecord.get(inputField.pos()));
+ }
+ }
+
+ return shreddedRecord;
+ }
+
+ /**
+ * Fails fast on the not-yet-supported read-then-reshred path. Records read
from an already-shredded
+ * base file (compaction/clustering) arrive with a populated {@code
typed_value} and a possibly-null
+ * {@code value}. The writer has no logic to reconstruct the unshredded
variant, so shredding would
+ * silently drop the payload (shredding enabled) and the parquet writer
would reject the null at the
+ * REQUIRED {@code value} field (shredding disabled). Reconstruction is
tracked in
+ * https://github.com/apache/hudi/issues/18931.
+ */
+ private void assertInputNotAlreadyShredded(IndexedRecord inputRecord) {
+ Schema inputSchema = inputRecord.getSchema();
+ for (String fieldName : variantFieldNames) {
+ Schema.Field field = inputSchema.getField(fieldName);
+ if (field == null) {
+ continue;
+ }
+ Object value = inputRecord.get(field.pos());
+ if (value instanceof GenericRecord
+ && ((GenericRecord)
value).getSchema().getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD) !=
null) {
+ throw new HoodieException("Writing an already-shredded variant field
'" + fieldName
+ + "' is not supported yet. Compaction/clustering read a base file
written with variant "
+ + "shredding and re-wrote it through the Avro path; the reader
does not yet reconstruct "
+ + "the unshredded variant. Tracked in
https://github.com/apache/hudi/issues/18931.");
+ }
+ }
}
@Override
@@ -74,6 +319,179 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
footerMetadata.put(key, value);
}
+ /**
+ * Bundles the Avro sub-schema and {@link HoodieSchema.Variant} for a
shredded variant field,
+ * keyed by effective-schema field index in {@link #shreddedVariantFields}.
+ */
+ private static final class ShreddedVariantField {
+ private final Schema avroSchema;
+ private final HoodieSchema.Variant hoodieSchema;
+
+ ShreddedVariantField(Schema avroSchema, HoodieSchema.Variant hoodieSchema)
{
+ this.avroSchema = avroSchema;
+ this.hoodieSchema = hoodieSchema;
+ }
+ }
+
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+ "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+ /**
+ * Applies a forced shredding schema to all variant fields in the given
schema.
+ * The forced schema DDL (e.g., {@code "a int, b string"}) defines the
typed_value
+ * fields that will be added to each variant column.
+ */
+ private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema,
String ddl) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc(),
+ shreddedFields);
+ HoodieSchema replacement = wasNullable
+ ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ changed = true;
+ } else {
+ newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+ }
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Parses a DDL-style shredding schema string (e.g., {@code "a int, b
string, c decimal(15,1)"})
+ * into a map of field names to their HoodieSchema types.
+ */
+ private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+ Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+ // Split on top-level commas only so parameterized types such as
decimal(15, 1) survive intact.
+ for (String fieldDef : StringUtils.splitTopLevelCommas(ddl)) {
+ String[] parts = fieldDef.split("\\s+", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid shredding DDL field definition (expected 'name type'): "
+ fieldDef);
+ }
+ fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+ }
+ return fields;
+ }
+
+ /**
+ * Parses a simple type name into a HoodieSchema.
+ * Supports common types: int, long, string, double, float, boolean, binary,
decimal(p,s).
+ */
+ private static HoodieSchema parseSimpleType(String type) {
+ String lower = type.toLowerCase();
+ switch (lower) {
+ case "int":
+ case "integer":
+ return HoodieSchema.create(HoodieSchemaType.INT);
+ case "long":
+ case "bigint":
+ return HoodieSchema.create(HoodieSchemaType.LONG);
+ case "string":
+ return HoodieSchema.create(HoodieSchemaType.STRING);
+ case "double":
+ return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+ case "float":
+ return HoodieSchema.create(HoodieSchemaType.FLOAT);
+ case "boolean":
+ return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+ case "binary":
+ return HoodieSchema.create(HoodieSchemaType.BYTES);
+ default:
+ Matcher m = DECIMAL_PATTERN.matcher(lower);
+ if (m.matches()) {
+ return HoodieSchema.createDecimal(
+ Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+ }
+ throw new IllegalArgumentException("Unsupported shredding type: " +
type);
+ }
+ }
+
+ /**
+ * Strips shredding from variant fields in the schema.
+ * Replaces shredded variant fields with unshredded variants (removing
typed_value).
+ */
+ private static HoodieSchema stripVariantShredding(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) unwrapped;
+ if (variant.isShredded()) {
+ // Replace with unshredded variant
+ HoodieSchema.Variant unshredded = HoodieSchema.createVariant(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc());
+ HoodieSchema replacement = wasNullable ?
HoodieSchema.createNullable(unshredded) : unshredded;
+ newFields.add(field.withSchema(replacement));
+ changed = true;
+ continue;
+ }
+ }
+ newFields.add(field);
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Extracts the non-null type from a union schema.
+ */
+ private static Schema getNonNullFromUnion(Schema unionSchema) {
+ for (Schema type : unionSchema.getTypes()) {
+ if (type.getType() != Schema.Type.NULL) {
+ return type;
+ }
+ }
+ throw new IllegalArgumentException("Union schema does not contain a
non-null type: " + unionSchema);
+ }
+
private static class HoodieBloomFilterAvroWriteSupport extends
HoodieBloomFilterWriteSupport<String> {
public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
@@ -84,4 +502,4 @@ public class HoodieAvroWriteSupport<T> extends
AvroWriteSupport<T> {
return StringUtils.getUTF8Bytes(key);
}
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index 83ebc7159e01..4d2630d2e7aa 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieParquetConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -54,10 +55,16 @@ import java.io.OutputStream;
import java.util.Properties;
import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
import static
org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
+ // Sole provider today: variant shredding currently requires Spark 4.0+. If
more providers are
+ // added, restore a candidate list here and try each in turn.
+ private static final String SPARK4_VARIANT_SHREDDING_PROVIDER =
+ "org.apache.hudi.variant.Spark4VariantShreddingProvider";
+
public HoodieAvroFileWriterFactory(HoodieStorage storage) {
super(storage);
}
@@ -140,9 +147,37 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
StorageConfiguration storageConf,
boolean
enableBloomFilter) {
Option<BloomFilter> filter = enableBloomFilter ?
Option.of(createBloomFilter(config)) : Option.empty();
+ HoodieSchema effectiveSchema =
HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
+ // Work on a copy so we never mutate the shared config's internal
Properties.
+ Properties props = TypedProperties.copy(config.getProps());
+ // Auto-detect variant shredding provider from classpath if not explicitly
configured
+ if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
+ String detectedClass = detectShreddingProviderClass();
+ if (detectedClass != null) {
+ props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(),
detectedClass);
+ }
+ }
return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class,
Properties.class},
- getAvroSchemaConverter((Configuration)
storageConf.unwrapAs(Configuration.class)).convert(schema), schema, filter,
config.getProps());
+ // Build the Parquet schema from the effective (possibly shredded)
schema so the message type
+ // matches the records actually written - a shredded variant has a
nullable value and a
+ // typed_value column; converting the original schema would mark value
REQUIRED and drop
+ // typed_value, failing the write with "Null-value for required field:
value".
+ getAvroSchemaConverter((Configuration)
storageConf.unwrapAs(Configuration.class)).convert(effectiveSchema), schema,
filter, props);
+ }
+
+ /**
+ * Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider}
implementation
+ * available on the classpath. Returns the fully-qualified class name if
found, or null.
+ */
+ private static String detectShreddingProviderClass() {
+ try {
+ Class.forName(SPARK4_VARIANT_SHREDDING_PROVIDER);
+ return SPARK4_VARIANT_SHREDDING_PROVIDER;
+ } catch (ClassNotFoundException e) {
+ // not on classpath
+ return null;
+ }
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
new file mode 100644
index 000000000000..eb936ac512e4
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupportShredding.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieAvroWriteSupportShredding {
+
+ /**
+ * The forced-shredding DDL must tolerate the commas inside parameterized
types such as
+ * {@code decimal(15, 1)}, which is the documented example on
+ * {@link
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST}. A naive
+ * {@code ddl.split(",")} tore that field apart and threw "Unsupported
shredding type: decimal(15".
+ */
+ @Test
+ void forcedShreddingDdlTreatsDecimalParensAsOneField() {
+ HoodieSchema record = HoodieSchema.createRecord(
+ "test_record", "org.apache.hudi.test", null,
+ Collections.singletonList(HoodieSchemaField.of("v",
HoodieSchema.createVariant())));
+
+ Properties props = new Properties();
+
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
"true");
+
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key(),
+ "a int, b string, c decimal(15, 1)");
+
+ HoodieSchema effective =
HoodieAvroWriteSupport.generateEffectiveSchema(record, props);
+
+ HoodieSchema variantField = effective.getFields().get(0).schema();
+ HoodieSchema variant = variantField.isNullable() ?
variantField.getNonNullType() : variantField;
+ HoodieSchema typedValueField = variant.getFields().stream()
+ .filter(f ->
HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD.equals(f.name()))
+ .map(HoodieSchemaField::schema)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("shredded variant is missing
typed_value: " + variant));
+ HoodieSchema typedValue = typedValueField.isNullable() ?
typedValueField.getNonNullType() : typedValueField;
+
+ List<String> shreddedFieldNames = typedValue.getFields().stream()
+ .map(HoodieSchemaField::name)
+ .collect(Collectors.toList());
+ assertEquals(Arrays.asList("a", "b", "c"), shreddedFieldNames);
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index e4e86abef939..9cbee1b44cbc 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -24,15 +24,22 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieParquetConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import lombok.Getter;
import lombok.Setter;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -41,12 +48,17 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Properties;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieBaseParquetWriter {
@@ -118,4 +130,43 @@ public class TestHoodieBaseParquetWriter {
"The writer stops write new records while the file doesn't reach the
max file size limit");
}
}
+
+ @Test
+ public void testRejectAlreadyShreddedVariantInput() {
+ // A record read from an already-shredded base file
(compaction/clustering) arrives with a
+ // populated typed_value. The Avro write support cannot reconstruct the
unshredded variant yet,
+ // so it must fail fast rather than silently drop the payload. Guard
tracked in
+ // https://github.com/apache/hudi/issues/18931.
+ HoodieSchema recordSchema = HoodieSchema.createRecord("guardTest", null,
null, Arrays.asList(
+ HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
+ HoodieSchemaField.of("v", HoodieSchema.createVariant())));
+
+ // Shredding disabled so no VariantShreddingProvider is required; the
variant field is still tracked.
+ Properties props = new Properties();
+
props.setProperty(HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
"false");
+
+ HoodieSchema effectiveSchema =
HoodieAvroWriteSupport.generateEffectiveSchema(recordSchema, props);
+ HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+ new AvroSchemaConverter().convert(effectiveSchema.toAvroSchema()),
recordSchema, Option.empty(), props);
+
+ // Input whose variant column is already shredded (carries typed_value).
+ Map<String, HoodieSchema> shreddedFields = new LinkedHashMap<>();
+ shreddedFields.put("a", HoodieSchema.create(HoodieSchemaType.INT));
+ Schema shreddedVariantAvro =
+ HoodieSchema.createVariantShreddedObject(null, null, null,
shreddedFields).getAvroSchema();
+
+ Schema inputAvro = Schema.createRecord("guardInput", null, null, false);
+ inputAvro.setFields(Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.INT), null, (Object)
null),
+ new Schema.Field("v", shreddedVariantAvro, null, (Object) null)));
+ GenericRecord variantValue = new GenericData.Record(shreddedVariantAvro);
+ variantValue.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(new byte[] {1}));
+ GenericRecord input = new GenericData.Record(inputAvro);
+ input.put("id", 1);
+ input.put("v", variantValue);
+
+ HoodieException ex = assertThrows(HoodieException.class, () ->
writeSupport.write(input));
+ assertTrue(ex.getMessage().contains("already-shredded") &&
ex.getMessage().contains("18931"),
+ "Expected the read-then-reshred guard message, got: " +
ex.getMessage());
+ }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
index 3c5545a6bf71..a49a11362288 100644
--- a/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/common/util/StringUtils.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -185,6 +186,41 @@ public class StringUtils {
return Stream.of(input.split(delimiter)).map(String::trim).filter(s ->
!s.isEmpty()).collect(Collectors.toList());
}
+ /**
+ * Splits {@code input} on commas that are not nested inside parentheses,
trimming each
+ * segment and dropping empty ones. Unlike {@link #split(String, String)}, a
comma inside a
+ * parenthesised descriptor -- e.g. {@code decimal(15, 1)} or {@code
VECTOR(128, DOUBLE)} -- is
+ * kept as part of the segment instead of being treated as a separator.
+ */
+ public static List<String> splitTopLevelCommas(@Nullable String input) {
+ if (isNullOrEmpty(input)) {
+ return Collections.emptyList();
+ }
+ List<String> parts = new ArrayList<>();
+ int depth = 0;
+ int start = 0;
+ for (int i = 0; i < input.length(); i++) {
+ char c = input.charAt(i);
+ if (c == '(') {
+ depth++;
+ } else if (c == ')') {
+ depth--;
+ } else if (c == ',' && depth == 0) {
+ addTrimmedNonEmpty(parts, input.substring(start, i));
+ start = i + 1;
+ }
+ }
+ addTrimmedNonEmpty(parts, input.substring(start));
+ return parts;
+ }
+
+ private static void addTrimmedNonEmpty(List<String> parts, String segment) {
+ String trimmed = segment.trim();
+ if (!trimmed.isEmpty()) {
+ parts.add(trimmed);
+ }
+ }
+
public static String getSuffixBy(String input, int ch) {
int i = input.lastIndexOf(ch);
if (i == -1) {
diff --git
a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
index 43a547744baa..3be3bfe3f998 100644
--- a/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
+++ b/hudi-io/src/test/java/org/apache/hudi/common/util/TestStringUtils.java
@@ -101,6 +101,21 @@ public class TestStringUtils {
assertEquals("a,", String.join(",", Arrays.asList("a", "")));
}
+ @Test
+ public void testSplitTopLevelCommas() {
+ // Commas inside parentheses must not split the segment (decimal shredding
DDL + VECTOR descriptors).
+ assertEquals(Arrays.asList("a int", "b string", "c decimal(15, 1)"),
+ StringUtils.splitTopLevelCommas("a int, b string, c decimal(15, 1)"));
+ assertEquals(Arrays.asList("v1:VECTOR(128, DOUBLE)", "v2:VECTOR(64)"),
+ StringUtils.splitTopLevelCommas("v1:VECTOR(128, DOUBLE),
v2:VECTOR(64)"));
+ // Segments are trimmed and empty ones dropped.
+ assertEquals(Arrays.asList("a int", "b string"),
+ StringUtils.splitTopLevelCommas(" a int ,, b string "));
+ // Null / empty / blank input yields an empty list.
+ assertEquals(Collections.emptyList(),
StringUtils.splitTopLevelCommas(null));
+ assertEquals(Collections.emptyList(), StringUtils.splitTopLevelCommas(""));
+ }
+
@Test
public void testStringNullToEmpty() {
String str = "This is a test";
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index e364772e0596..3108b96fdd18 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -27,6 +27,10 @@ import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
+import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
@@ -542,4 +546,158 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
spark.sql(s"drop table $tableName")
}
+
+ test("Test Shredded Variant Write and Read + Validate Parquet Schema after
Write") {
+ assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")
+
+ // Test 1: Shredding enabled with forced schema → parquet should have
typed_value
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | v variant,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true")
+ spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true")
+ spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test =
a int, b string")
+
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+ """.stripMargin)
+ // Reading shredded variants back needs Spark 4.1+
(spark.sql.variant.allowReadingShredded);
+ // Spark 4.0's reader rejects the 3-field shredded layout (4.0 read
support is added later, see
+ // https://github.com/apache/hudi/issues/18931). The shredded write is
still validated below.
+ if (HoodieSparkUtils.gteqSpark4_1) {
+ checkAnswer(s"select id, name, cast(v as string), ts from $tableName
order by id")(
+ Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+ )
+ }
+
+ // Verify parquet schema has shredded structure with typed_value
+ val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+ assert(parquetFiles.nonEmpty, "Should have at least one data parquet
file")
+
+ parquetFiles.foreach { filePath =>
+ val schema = readParquetSchema(filePath)
+ val variantGroup = getFieldAsGroup(schema, "v")
+ assert(variantGroup.containsField("typed_value"),
+ s"Shredded variant should have typed_value field.
Schema:\n$variantGroup")
+ val valueField =
variantGroup.getType(variantGroup.getFieldIndex("value"))
+ assert(valueField.getRepetition == Type.Repetition.OPTIONAL,
+ "Shredded variant value field should be OPTIONAL")
+ val metadataField =
variantGroup.getType(variantGroup.getFieldIndex("metadata"))
+ assert(metadataField.getRepetition == Type.Repetition.REQUIRED,
+ "Shredded variant metadata field should be REQUIRED")
+ }
+ })
+ }
+
+ test("Test Unshredded Variant Write and Read + Validate Parquet Schema after
Write") {
+ assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")
+ // Shredding disabled parquet should NOT have typed_value
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | v variant,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false")
+
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, cast(v as string), ts from $tableName
order by id")(
+ Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+ )
+
+ // Verify parquet schema does NOT have typed_value
+ val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
+ assert(parquetFiles.nonEmpty, "Should have at least one data parquet
file")
+
+ parquetFiles.foreach { filePath =>
+ val schema = readParquetSchema(filePath)
+ val variantGroup = getFieldAsGroup(schema, "v")
+ assert(!variantGroup.containsField("typed_value"),
+ s"Non-shredded variant should NOT have typed_value field.
Schema:\n$variantGroup")
+ val valueField =
variantGroup.getType(variantGroup.getFieldIndex("value"))
+ assert(valueField.getRepetition == Type.Repetition.REQUIRED,
+ "Non-shredded variant value field should be REQUIRED")
+ }
+
+ // Verify data can still be read back for the non-shredded case
+ checkAnswer(s"select id, name, cast(v as string), ts from $tableName
order by id")(
+ Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
+ )
+ })
+ }
+
+ /**
+ * Lists data parquet files in the table directory, excluding Hudi metadata
files.
+ */
+ private def listDataParquetFiles(tablePath: String): Seq[String] = {
+ val conf = spark.sparkContext.hadoopConfiguration
+ val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf)
+ val iter = fs.listFiles(new HadoopPath(tablePath), true)
+ val files = scala.collection.mutable.ArrayBuffer[String]()
+ while (iter.hasNext) {
+ val file = iter.next()
+ val path = file.getPath.toString
+ if (path.endsWith(".parquet") && !path.contains(".hoodie")) {
+ files += path
+ }
+ }
+ files.toSeq
+ }
+
+ /**
+ * Reads the Parquet schema (MessageType) from a parquet file.
+ */
+ private def readParquetSchema(filePath: String): MessageType = {
+ val conf = spark.sparkContext.hadoopConfiguration
+ val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf)
+ val reader = ParquetFileReader.open(inputFile)
+ try {
+ reader.getFooter.getFileMetaData.getSchema
+ } finally {
+ reader.close()
+ }
+ }
+
+ /**
+ * Gets a named field from a GroupType (MessageType) and returns it as a
GroupType.
+ * Uses getFieldIndex(String) + getType(int) to avoid Scala overload
resolution issues.
+ */
+ private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType
= {
+ val idx: Int = parent.getFieldIndex(fieldName)
+ parent.getType(idx).asGroupType()
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
new file mode 100644
index 000000000000..a0aa663f9463
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.variant;
+
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.schema.HoodieSchema;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.types.variant.Variant;
+import org.apache.spark.types.variant.VariantSchema;
+import org.apache.spark.types.variant.VariantShreddingWriter;
+import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult;
+import
org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Implementation of {@link VariantShreddingProvider} using Spark 4's variant
parsing library.
+ *
+ * <p>This class bridges the Avro record path and Spark's {@link
VariantShreddingWriter}
+ * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It
converts
+ * the shredded output into Avro {@link GenericRecord}s that can be written via
+ * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p>
+ *
+ * <p>The shredding logic is delegated to {@link
VariantShreddingWriter#castShredded},
+ * which handles scalar, object, and array shredding including residual value
construction
+ * for non-matching fields. This class implements the {@link ShreddedResult}
and
+ * {@link ShreddedResultBuilder} interfaces to collect the shredded components
into
+ * Avro GenericRecords.</p>
+ */
+public class Spark4VariantShreddingProvider implements
VariantShreddingProvider {
+
+ @Override
+ public GenericRecord shredVariantRecord(
+ GenericRecord unshreddedVariant,
+ Schema shreddedSchema,
+ HoodieSchema.Variant variantSchema) {
+
+ ByteBuffer valueBuf = (ByteBuffer)
unshreddedVariant.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+ ByteBuffer metadataBuf = (ByteBuffer)
unshreddedVariant.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+
+ if (valueBuf == null || metadataBuf == null) {
+ return null;
+ }
+
+ byte[] valueBytes = toByteArray(valueBuf);
+ byte[] metadataBytes = toByteArray(metadataBuf);
+
+ Variant variant = new Variant(valueBytes, metadataBytes);
+
+ // Build VariantSchema from the Avro shredded schema, registering
+ // Avro schemas at each level for GenericRecord construction.
+ AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder();
+ VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true,
builder);
+
+ // Delegate to Spark's VariantShreddingWriter for the actual shredding
logic.
+ AvroShreddedResult result = (AvroShreddedResult)
+ VariantShreddingWriter.castShredded(variant, sparkSchema, builder);
+
+ return result.toGenericRecord();
+ }
+
+ /**
+ * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a
+ * shredded variant structure ({@code value}, {@code metadata}, {@code
typed_value}).
+ *
+ * <p>This method also registers the Avro schema mapping in the builder so
that
+ * {@link AvroShreddedResultBuilder#createEmpty} can create results with the
+ * correct Avro schema at each nesting level.</p>
+ */
+ private VariantSchema buildVariantSchema(Schema avroSchema, boolean
isTopLevel,
+ AvroShreddedResultBuilder builder) {
+ Schema.Field valueField =
avroSchema.getField(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+ Schema.Field metadataField =
avroSchema.getField(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
+ Schema.Field typedValueField =
avroSchema.getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+
+ int fieldCount = 0;
+ int variantIdx = valueField != null ? fieldCount++ : -1;
+ int topLevelMetadataIdx;
+ if (metadataField != null && isTopLevel) {
+ topLevelMetadataIdx = fieldCount++;
+ } else {
+ topLevelMetadataIdx = -1;
+ if (metadataField != null) {
+ fieldCount++;
+ }
+ }
+ int typedIdx = typedValueField != null ? fieldCount++ : -1;
+ int numFields = fieldCount;
+
+ VariantSchema.ScalarType scalarSchema = null;
+ VariantSchema.ObjectField[] objectSchema = null;
+ VariantSchema arraySchema = null;
+
+ if (typedValueField != null) {
+ Schema tvSchema = unwrapNullable(typedValueField.schema());
+
+ switch (tvSchema.getType()) {
+ case RECORD:
+ // Object shredding: each field has a nested {value, typed_value}
sub-struct
+ List<VariantSchema.ObjectField> fields = new ArrayList<>();
+ for (Schema.Field field : tvSchema.getFields()) {
+ Schema fieldSchema = unwrapNullable(field.schema());
+ VariantSchema subSchema = buildVariantSchema(fieldSchema, false,
builder);
+ fields.add(new VariantSchema.ObjectField(field.name(), subSchema));
+ }
+ objectSchema = fields.toArray(new VariantSchema.ObjectField[0]);
+ break;
+
+ case ARRAY:
+ // Array shredding: elements follow the shredding schema
+ Schema elementSchema = unwrapNullable(tvSchema.getElementType());
+ arraySchema = buildVariantSchema(elementSchema, false, builder);
+ break;
+
+ default:
+ // Scalar shredding
+ scalarSchema = avroTypeToScalarType(tvSchema);
+ break;
+ }
+ }
+
+ VariantSchema result = new VariantSchema(
+ typedIdx, variantIdx, topLevelMetadataIdx, numFields,
+ scalarSchema, objectSchema, arraySchema);
+
+ builder.registerSchema(result, avroSchema);
+
+ return result;
+ }
+
+ /**
+ * Maps an Avro {@link Schema} type (potentially with logical type
annotations)
+ * to a {@link VariantSchema.ScalarType}.
+ */
+ private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) {
+ LogicalType logicalType = schema.getLogicalType();
+
+ // Check logical types first
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ return new VariantSchema.DecimalType(decimal.getPrecision(),
decimal.getScale());
+ }
+ String name = logicalType.getName();
+ if ("date".equals(name)) {
+ return new VariantSchema.DateType();
+ }
+ if ("timestamp-micros".equals(name)) {
+ return new VariantSchema.TimestampType();
+ }
+ if ("local-timestamp-micros".equals(name)) {
+ return new VariantSchema.TimestampNTZType();
+ }
+ if ("timestamp-millis".equals(name)) {
+ return new VariantSchema.TimestampType();
+ }
+ if ("local-timestamp-millis".equals(name)) {
+ return new VariantSchema.TimestampNTZType();
+ }
+ if ("uuid".equals(name)) {
+ return new VariantSchema.UuidType();
+ }
+ }
+
+ switch (schema.getType()) {
+ case BOOLEAN:
+ return new VariantSchema.BooleanType();
+ case INT:
+ return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT);
+ case LONG:
+ return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG);
+ case FLOAT:
+ return new VariantSchema.FloatType();
+ case DOUBLE:
+ return new VariantSchema.DoubleType();
+ case STRING:
+ return new VariantSchema.StringType();
+ case BYTES:
+ return new VariantSchema.BinaryType();
+ case FIXED:
+ return new VariantSchema.BinaryType();
+ default:
+ return null;
+ }
+ }
+
+ private static Schema unwrapNullable(Schema schema) {
+ if (schema.getType() == Schema.Type.UNION) {
+ for (Schema type : schema.getTypes()) {
+ if (type.getType() != Schema.Type.NULL) {
+ return type;
+ }
+ }
+ }
+ return schema;
+ }
+
+ private static byte[] toByteArray(ByteBuffer buffer) {
+ if (buffer.hasArray() && buffer.position() == 0
+ && buffer.arrayOffset() == 0
+ && buffer.remaining() == buffer.array().length) {
+ return buffer.array();
+ }
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+
+ /**
+ * {@link ShreddedResult} implementation that collects shredded variant
components
+ * and converts them into an Avro {@link GenericRecord}.
+ */
+ static class AvroShreddedResult implements ShreddedResult {
+ private final VariantSchema variantSchema;
+ private final Schema avroSchema;
+
+ private byte[] metadata;
+ private byte[] variantValue;
+ private Object scalarValue;
+ private AvroShreddedResult[] objectFields;
+ private AvroShreddedResult[] arrayElements;
+
+ AvroShreddedResult(VariantSchema variantSchema, Schema avroSchema) {
+ this.variantSchema = variantSchema;
+ this.avroSchema = avroSchema;
+ }
+
+ @Override
+ public void addArray(ShreddedResult[] array) {
+ this.arrayElements = new AvroShreddedResult[array.length];
+ for (int i = 0; i < array.length; i++) {
+ this.arrayElements[i] = (AvroShreddedResult) array[i];
+ }
+ }
+
+ @Override
+ public void addObject(ShreddedResult[] values) {
+ this.objectFields = new AvroShreddedResult[values.length];
+ for (int i = 0; i < values.length; i++) {
+ this.objectFields[i] = (AvroShreddedResult) values[i];
+ }
+ }
+
+ @Override
+ public void addVariantValue(byte[] result) {
+ this.variantValue = result;
+ }
+
+ @Override
+ public void addScalar(Object result) {
+ this.scalarValue = result;
+ }
+
+ @Override
+ public void addMetadata(byte[] result) {
+ this.metadata = result;
+ }
+
+ /**
+ * Converts the collected shredded components into an Avro {@link
GenericRecord}.
+ */
+ GenericRecord toGenericRecord() {
+ GenericRecord record = new GenericData.Record(avroSchema);
+
+ // Metadata (only present at top level)
+ if (metadata != null) {
+ record.put(HoodieSchema.Variant.VARIANT_METADATA_FIELD,
ByteBuffer.wrap(metadata));
+ }
+
+ // Value (variant binary for non-shredded or residual data)
+ Schema.Field valueField =
avroSchema.getField(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
+ if (valueField != null) {
+ if (variantValue != null) {
+ record.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD,
ByteBuffer.wrap(variantValue));
+ } else {
+ record.put(HoodieSchema.Variant.VARIANT_VALUE_FIELD, null);
+ }
+ }
+
+ // Typed value
+ Schema.Field tvField =
avroSchema.getField(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD);
+ if (tvField == null) {
+ return record;
+ }
+
+ if (scalarValue != null) {
+ Schema tvSchema = unwrapNullable(tvField.schema());
+ record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD,
convertScalarToAvro(scalarValue, tvSchema));
+ } else if (objectFields != null) {
+ Schema tvSchema = unwrapNullable(tvField.schema());
+ GenericRecord tvRecord = new GenericData.Record(tvSchema);
+ for (int i = 0; i < objectFields.length; i++) {
+ String fieldName = variantSchema.objectSchema[i].fieldName;
+ // Always create the sub-record even for missing fields (non-null
struct with null fields)
+ tvRecord.put(fieldName, objectFields[i].toGenericRecord());
+ }
+ record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, tvRecord);
+ } else if (arrayElements != null) {
+ List<GenericRecord> list = new ArrayList<>(arrayElements.length);
+ for (AvroShreddedResult element : arrayElements) {
+ list.add(element.toGenericRecord());
+ }
+ record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, list);
+ } else {
+ record.put(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD, null);
+ }
+
+ return record;
+ }
+
+ /**
+ * Converts a scalar value from Spark's variant representation to an
Avro-compatible type.
+ * Handles type widening (Byte/Short to Int/Long) and binary wrapping.
+ */
+ private static Object convertScalarToAvro(Object value, Schema avroSchema)
{
+ if (value instanceof byte[]) {
+ return ByteBuffer.wrap((byte[]) value);
+ }
+ if (value instanceof UUID) {
+ return value.toString();
+ }
+ // Widen integer types to match Avro schema expectations
+ if (avroSchema.getType() == Schema.Type.INT) {
+ if (value instanceof Byte) {
+ return ((Byte) value).intValue();
+ }
+ if (value instanceof Short) {
+ return ((Short) value).intValue();
+ }
+ }
+ if (avroSchema.getType() == Schema.Type.LONG) {
+ if (value instanceof Byte) {
+ return ((Byte) value).longValue();
+ }
+ if (value instanceof Short) {
+ return ((Short) value).longValue();
+ }
+ if (value instanceof Integer) {
+ return ((Integer) value).longValue();
+ }
+ }
+ // BigDecimal, Boolean, String, Integer, Long, Float, Double
+ // are directly compatible with Avro's type system
+ return value;
+ }
+ }
+
+ /**
+ * {@link ShreddedResultBuilder} that creates {@link AvroShreddedResult}
instances
+ * with the corresponding Avro schema at each nesting level.
+ */
+ static class AvroShreddedResultBuilder implements ShreddedResultBuilder {
+ private final Map<VariantSchema, Schema> schemaMap = new
IdentityHashMap<>();
+
+ void registerSchema(VariantSchema variantSchema, Schema avroSchema) {
+ schemaMap.put(variantSchema, avroSchema);
+ }
+
+ @Override
+ public ShreddedResult createEmpty(VariantSchema schema) {
+ Schema avroSchema = schemaMap.get(schema);
+ if (avroSchema == null) {
+ throw new IllegalStateException(
+ "No Avro schema registered for VariantSchema: " + schema);
+ }
+ return new AvroShreddedResult(schema, avroSchema);
+ }
+
+ @Override
+ public boolean allowNumericScaleChanges() {
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index ea6e96943a69..a679c7e62bab 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -180,18 +180,20 @@ abstract class BaseSpark4Adapter extends SparkAdapter
with Logging {
override def isDataTypeEqualForPhysicalSchema(requiredType: DataType,
fileType: DataType): Option[Boolean] = {
/**
* Checks if a StructType is the physical representation of VariantType in
Parquet.
- * VariantType is stored in Parquet as a struct with two binary fields:
"metadata" and "value".
+ * VariantType is stored in Parquet as a struct with binary fields:
"metadata" and "value".
+ * Supports both unshredded (2 fields) and shredded (3 fields with
"typed_value") layouts.
*/
+ // TODO(voon) parquet-1.16: replace this name/arity shape heuristic with a
VariantLogicalTypeAnnotation check once all supported parquet versions are >=
1.16.
def isVariantPhysicalSchema(structType: StructType): Boolean = {
- if (structType.fields.length != 2) {
- false
- } else {
- val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
- fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
- fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
- fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
- fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
- }
+ val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+ val hasRequiredFields =
fieldMap.contains(HoodieSchema.Variant.VARIANT_VALUE_FIELD) &&
+ fieldMap.contains(HoodieSchema.Variant.VARIANT_METADATA_FIELD) &&
+ fieldMap(HoodieSchema.Variant.VARIANT_VALUE_FIELD) == BinaryType &&
+ fieldMap(HoodieSchema.Variant.VARIANT_METADATA_FIELD) == BinaryType
+ val isUnshredded = structType.fields.length == 2
+ val isShredded = structType.fields.length == 3 &&
+ fieldMap.contains(HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD)
+ hasRequiredFields && (isUnshredded || isShredded)
}
// Handle VariantType comparisons
@@ -261,6 +263,7 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
applyVariantLogicalType(builder).named(fieldName)
}
+ // TODO(#18935) drop-spark4.0: when all remaining 4.x adapters are parquet
1.16+, apply variantType() in this base and delete the no-op default plus the
Spark4_1Adapter override.
protected def applyVariantLogicalType(builder:
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = builder
override def isVariantShreddingStruct(structType: StructType): Boolean = {
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
index 02a39037822b..3eb1c39dc109 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
@@ -31,7 +31,7 @@ import java.time.ZoneId
import scala.collection.JavaConverters._
-// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark
4.1+ reads
+// TODO(#18935): Delete this file when the hudi-spark4.0.x module is removed.
Spark 4.1+ reads
// variant fields by name via SPARK-54410, so the reorder workaround below is
no longer
// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its
converters
// array in hardcoded [value, metadata] order, then indexes by schema
position. If the