hudi-agent commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3368472556
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -74,6 +292,181 @@ public void addFooterMetadata(String key, String value) {
footerMetadata.put(key, value);
}
+ /**
+ * Finds the position in {@link #shreddedVariantFieldIndices} for the given
effective field index,
+ * or -1 if this field is not a variant field that needs shredding.
+ */
+ private int findVariantIndex(int effectiveFieldIndex) {
+ for (int i = 0; i < shreddedVariantFieldIndices.length; i++) {
+ if (shreddedVariantFieldIndices[i] == effectiveFieldIndex) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static final Pattern DECIMAL_PATTERN = Pattern.compile(
+ "decimal\\s*\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)");
+
+ /**
+ * Applies a forced shredding schema to all variant fields in the given
schema.
+ * The forced schema DDL (e.g., {@code "a int, b string"}) defines the
typed_value
+ * fields that will be added to each variant column.
+ */
+ private static HoodieSchema applyForcedShreddingSchema(HoodieSchema schema,
String ddl) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
+ return schema;
+ }
+
+ Map<String, HoodieSchema> shreddedFields = parseShreddingDDL(ddl);
+
+ List<HoodieSchemaField> fields = schema.getFields();
+ List<HoodieSchemaField> newFields = new ArrayList<>();
+ boolean changed = false;
+
+ for (HoodieSchemaField field : fields) {
+ HoodieSchema fieldSchema = field.schema();
+ boolean wasNullable = fieldSchema.isNullable();
+ HoodieSchema unwrapped = wasNullable ? fieldSchema.getNonNullType() :
fieldSchema;
+
+ if (unwrapped.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant shreddedVariant =
HoodieSchema.createVariantShreddedObject(
+ unwrapped.getAvroSchema().getName(),
+ unwrapped.getAvroSchema().getNamespace(),
+ unwrapped.getAvroSchema().getDoc(),
+ shreddedFields);
+ HoodieSchema replacement = wasNullable
+ ? HoodieSchema.createNullable(shreddedVariant) : shreddedVariant;
+
newFields.add(HoodieSchemaUtils.createNewSchemaField(field.makeNullable().withSchema(replacement)));
+ changed = true;
+ } else {
+ newFields.add(HoodieSchemaUtils.createNewSchemaField(field));
+ }
+ }
+
+ if (!changed) {
+ return schema;
+ }
+
+ return HoodieSchema.createRecord(
+ schema.getAvroSchema().getName(),
+ schema.getAvroSchema().getNamespace(),
+ schema.getAvroSchema().getDoc(),
+ newFields);
+ }
+
+ /**
+ * Parses a DDL-style shredding schema string (e.g., {@code "a int, b
string, c decimal(15,1)"})
+ * into a map of field names to their HoodieSchema types.
+ */
+ private static Map<String, HoodieSchema> parseShreddingDDL(String ddl) {
+ Map<String, HoodieSchema> fields = new LinkedHashMap<>();
+ for (String fieldDef : ddl.split(",")) {
+ String trimmed = fieldDef.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ String[] parts = trimmed.split("\\s+", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid shredding DDL field definition (expected 'name type'): "
+ trimmed);
+ }
+ fields.put(parts[0].trim(), parseSimpleType(parts[1].trim()));
+ }
+ return fields;
+ }
+
+ /**
+ * Parses a simple type name into a HoodieSchema.
+ * Supports common types: int, long, string, double, float, boolean, binary,
decimal(p,s).
+ */
+ private static HoodieSchema parseSimpleType(String type) {
+ String lower = type.toLowerCase();
+ switch (lower) {
+ case "int":
+ case "integer":
+ return HoodieSchema.create(HoodieSchemaType.INT);
+ case "long":
+ case "bigint":
+ return HoodieSchema.create(HoodieSchemaType.LONG);
+ case "string":
+ return HoodieSchema.create(HoodieSchemaType.STRING);
+ case "double":
+ return HoodieSchema.create(HoodieSchemaType.DOUBLE);
+ case "float":
+ return HoodieSchema.create(HoodieSchemaType.FLOAT);
+ case "boolean":
+ return HoodieSchema.create(HoodieSchemaType.BOOLEAN);
+ case "binary":
+ return HoodieSchema.create(HoodieSchemaType.BYTES);
+ default:
+ Matcher m = DECIMAL_PATTERN.matcher(lower);
+ if (m.matches()) {
+ return HoodieSchema.createDecimal(
+ Integer.parseInt(m.group(1)), Integer.parseInt(m.group(2)));
+ }
+ throw new IllegalArgumentException("Unsupported shredding type: " +
type);
+ }
+ }
+
+ /**
+ * Strips shredding from variant fields in the schema.
+ * Replaces shredded variant fields with unshredded variants (removing
typed_value).
+ */
+ private static HoodieSchema unshreddVariantFields(HoodieSchema schema) {
+ if (schema.getType() != HoodieSchemaType.RECORD) {
Review Comment:
🤖 nit: `unshreddVariantFields` reads awkwardly — `unshredd` isn't a real
verb form. Something like `stripVariantShredding` or `removeVariantShredding`
would convey the intent more naturally.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,256 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
/**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider}
loaded via reflection.</p>
*/
public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
private final Option<HoodieBloomFilterWriteSupport<String>>
bloomFilterWriteSupportOpt;
private final Map<String, String> footerMetadata = new HashMap<>();
protected final Properties properties;
+ /**
+ * Whether variant write shredding is enabled via config.
+ */
+ private final boolean variantWriteShreddingEnabled;
+
+ /**
+ * The effective (possibly shredded) HoodieSchema used for writing.
+ */
+ private final HoodieSchema effectiveHoodieSchema;
+
+ /**
+ * The effective Avro schema (derived from effectiveHoodieSchema).
+ */
+ private final Schema effectiveAvroSchema;
+
+ /**
+ * Indices of top-level variant fields that need shredding transformation.
+ * Empty array if no shredding is needed.
+ */
+ private final int[] shreddedVariantFieldIndices;
+
+ /**
+ * The shredded Avro sub-schema for each variant field at the corresponding
index.
+ * Indexed by position in {@link #shreddedVariantFieldIndices}.
+ */
+ private final Schema[] shreddedVariantAvroSchemas;
+
+ /**
+ * The HoodieSchema.Variant for each variant field at the corresponding
index.
+ * Indexed by position in {@link #shreddedVariantFieldIndices}.
+ */
+ private final HoodieSchema.Variant[] shreddedVariantHoodieSchemas;
+
+ /**
+ * Provider for variant shredding (loaded via reflection). Null if no
shredding is needed.
+ */
+ private final VariantShreddingProvider shreddingProvider;
+
public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema,
Option<BloomFilter> bloomFilterOpt,
Properties properties) {
- super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+ this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema,
properties), bloomFilterOpt, properties);
+ }
+
+ private HoodieAvroWriteSupport(MessageType schema, HoodieSchema
hoodieSchema, HoodieSchema effectiveSchema,
+ Option<BloomFilter> bloomFilterOpt,
Properties properties) {
+ super(schema, effectiveSchema.toAvroSchema(),
ConvertingGenericData.INSTANCE);
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
this.properties = properties;
String vectorMeta =
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
if (!vectorMeta.isEmpty()) {
footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
}
+
+ this.effectiveHoodieSchema = effectiveSchema;
+ this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+ this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ // Identify variant fields that need shredding
+ List<Integer> variantIndices = new ArrayList<>();
+ List<Schema> variantAvroSchemas = new ArrayList<>();
+ List<HoodieSchema.Variant> variantHoodieSchemas = new ArrayList<>();
+
+ if (variantWriteShreddingEnabled && effectiveSchema.getType() ==
HoodieSchemaType.RECORD) {
+ List<HoodieSchemaField> fields = effectiveSchema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField field = fields.get(i);
+ HoodieSchema fieldSchema = field.schema();
+ // Unwrap nullable union to get the underlying type
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullType();
+ }
+ if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+ if (variant.isShredded() &&
variant.getTypedValueField().isPresent()) {
+ variantIndices.add(i);
+ // Get the Avro sub-schema for this variant field from the
effective schema
+ Schema fieldAvroSchema =
effectiveAvroSchema.getFields().get(i).schema();
+ // Unwrap nullable union
+ if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+ fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+ }
+ variantAvroSchemas.add(fieldAvroSchema);
+ variantHoodieSchemas.add(variant);
+ }
+ }
+ }
+ }
+
+ this.shreddedVariantFieldIndices =
variantIndices.stream().mapToInt(Integer::intValue).toArray();
+ this.shreddedVariantAvroSchemas = variantAvroSchemas.toArray(new
Schema[0]);
+ this.shreddedVariantHoodieSchemas = variantHoodieSchemas.toArray(new
HoodieSchema.Variant[0]);
+
+ // Load shredding provider via reflection if needed
+ if (shreddedVariantFieldIndices.length > 0) {
+ String providerClass =
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+ if (providerClass == null || providerClass.isEmpty()) {
+ throw new HoodieException("Variant write shredding is enabled and the
write schema requires shredding "
+ + "(typed_value columns present), but no VariantShreddingProvider
is configured or available on the "
+ + "classpath. Set " +
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key() + " or add a provider "
+ + "implementation (e.g. the Spark variant module) to the
classpath.");
+ }
+ this.shreddingProvider = (VariantShreddingProvider)
ReflectionUtils.loadClass(providerClass);
+ } else {
+ this.shreddingProvider = null;
+ }
+ }
+
+ /**
+ * Generates the effective schema for writing, applying variant shredding
configuration.
+ *
+ * <p>When shredding is disabled, shredded variant fields are replaced with
unshredded
+ * variants (removing {@code typed_value}) so that the Parquet file does not
contain
+ * unused typed_value columns.</p>
+ *
+ * <p>When shredding is enabled and a forced shredding schema is configured
via
+ * {@link
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+ * all variant fields are replaced with shredded variants using the forced
schema.
+ * This handles the case where the input schema is unshredded but shredding
is desired.</p>
+ *
+ * <p>When shredding is enabled without a forced schema, the schema is
returned as-is
+ * (already-shredded variants stay shredded, unshredded variants stay
unshredded).</p>
+ *
+ * @param hoodieSchema the original HoodieSchema
+ * @param properties writer properties containing shredding configuration
+ * @return the effective schema to use for writing
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, Properties properties) {
+ boolean shreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ if (!shreddingEnabled) {
+ // Schemas from clustering/compaction may still be shredded (read from
on-disk Parquet files
+ // written with shredding enabled), so we need to strip typed_value when
shredding
+ // is disabled.
+ return unshreddVariantFields(hoodieSchema);
+ }
+
+ // Check if a forced shredding schema is configured
+ String forceShreddingSchema = properties.getProperty(
+ PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+ if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+ return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+ }
+
+ // When enabled without forced schema, use the schema as-is
+ // (shredded variants stay shredded, unshredded variants stay unshredded)
+ return hoodieSchema;
+ }
+
+ /**
+ * Overloaded version accepting HoodieConfig for use by factories.
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, HoodieConfig config) {
+ return generateEffectiveSchema(hoodieSchema, config.getProps());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(T record) {
+ if (shreddedVariantFieldIndices.length > 0 && shreddingProvider != null) {
+ IndexedRecord inputRecord = (IndexedRecord) record;
+ GenericRecord shreddedRecord = new
GenericData.Record(effectiveAvroSchema);
+
+ // Copy all fields, transforming variant fields that need shredding
+ List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+ Schema inputSchema = inputRecord.getSchema();
+
+ for (int i = 0; i < effectiveFields.size(); i++) {
+ Schema.Field effectiveField = effectiveFields.get(i);
+ String fieldName = effectiveField.name();
+ Schema.Field inputField = inputSchema.getField(fieldName);
+ if (inputField == null) {
+ continue;
+ }
+
+ int variantIdx = findVariantIndex(i);
+ if (variantIdx >= 0) {
+ // This is a shredded variant field - transform it
+ Object fieldValue = inputRecord.get(inputField.pos());
+ if (fieldValue instanceof GenericRecord) {
+ GenericRecord variantRecord = (GenericRecord) fieldValue;
+ GenericRecord shreddedVariant =
shreddingProvider.shredVariantRecord(
+ variantRecord,
+ shreddedVariantAvroSchemas[variantIdx],
+ shreddedVariantHoodieSchemas[variantIdx]);
+ shreddedRecord.put(i, shreddedVariant);
Review Comment:
🤖 What happens during compaction/clustering when the input records were read
from an already-shredded base file?
`Spark4VariantShreddingProvider.shredVariantRecord` returns `null` when `value`
or `metadata` is null, but in shredded form `value` is legitimately null when
`typed_value` carries the data. Could this silently drop the variant payload?
@yihua might want to weigh in on whether the upstream Avro reader is expected
to reconstruct the unshredded form before this point.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -261,6 +263,7 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
applyVariantLogicalType(builder).named(fieldName)
}
+ // TODO(HUDI-XXXX) drop-spark4.0: when all remaining 4.x adapters are
parquet 1.16+, apply variantType() in this base and delete the no-op default
plus the Spark4_1Adapter override.
Review Comment:
🤖 nit: `HUDI-XXXX` is a placeholder — could you file a tracking ticket and
replace this with the real JIRA ID before merging? TODOs without a real ticket
tend to become permanent.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
Review Comment:
🤖 **Line 285:** Symmetric concern to the shredding-enabled case: when
shredding is disabled, `unshreddVariantFields` strips `typed_value` from the
effective writer schema, but records here are passed straight to
`super.write(record)`. If a compaction reads a shredded base file and produces
records with `typed_value` present and `value=null`, the parquet writer will
reject the null at the unshredded schema's REQUIRED `value` field. Is the
upstream read path expected to convert to unshredded form before reaching the
writer?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]