voonhous commented on code in PR #18065:
URL: https://github.com/apache/hudi/pull/18065#discussion_r3371423821
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
@@ -20,38 +20,256 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
/**
- * Wrap AvroWriterSupport for plugging in the bloom filter.
+ * Wrap AvroWriterSupport for plugging in the bloom filter and variant
shredding support.
+ *
+ * <p>When variant columns are configured for shredding (via {@link
HoodieSchema.Variant#isShredded()}),
+ * this class transforms variant records at write time to populate {@code
typed_value} columns
+ * by parsing variant binary data using a {@link VariantShreddingProvider}
loaded via reflection.</p>
*/
public class HoodieAvroWriteSupport<T> extends AvroWriteSupport<T> {
private final Option<HoodieBloomFilterWriteSupport<String>>
bloomFilterWriteSupportOpt;
private final Map<String, String> footerMetadata = new HashMap<>();
protected final Properties properties;
+ /**
+ * Whether variant write shredding is enabled via config.
+ */
+ private final boolean variantWriteShreddingEnabled;
+
+ /**
+ * The effective (possibly shredded) HoodieSchema used for writing.
+ */
+ private final HoodieSchema effectiveHoodieSchema;
+
+ /**
+ * The effective Avro schema (derived from effectiveHoodieSchema).
+ */
+ private final Schema effectiveAvroSchema;
+
+ /**
+ * Indices of top-level variant fields that need shredding transformation.
+ * Empty array if no shredding is needed.
+ */
+ private final int[] shreddedVariantFieldIndices;
+
+ /**
+ * The shredded Avro sub-schema for each variant field at the corresponding
index.
+ * Indexed by position in {@link #shreddedVariantFieldIndices}.
+ */
+ private final Schema[] shreddedVariantAvroSchemas;
+
+ /**
+ * The HoodieSchema.Variant for each variant field at the corresponding
index.
+ * Indexed by position in {@link #shreddedVariantFieldIndices}.
+ */
+ private final HoodieSchema.Variant[] shreddedVariantHoodieSchemas;
+
+ /**
+ * Provider for variant shredding (loaded via reflection). Null if no
shredding is needed.
+ */
+ private final VariantShreddingProvider shreddingProvider;
+
public HoodieAvroWriteSupport(MessageType schema, HoodieSchema hoodieSchema,
Option<BloomFilter> bloomFilterOpt,
Properties properties) {
- super(schema, hoodieSchema.toAvroSchema(), ConvertingGenericData.INSTANCE);
+ this(schema, hoodieSchema, generateEffectiveSchema(hoodieSchema,
properties), bloomFilterOpt, properties);
+ }
+
+ private HoodieAvroWriteSupport(MessageType schema, HoodieSchema
hoodieSchema, HoodieSchema effectiveSchema,
+ Option<BloomFilter> bloomFilterOpt,
Properties properties) {
+ super(schema, effectiveSchema.toAvroSchema(),
ConvertingGenericData.INSTANCE);
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new);
this.properties = properties;
String vectorMeta =
HoodieSchema.buildVectorColumnsMetadataValue(hoodieSchema);
if (!vectorMeta.isEmpty()) {
footerMetadata.put(HoodieSchema.VECTOR_COLUMNS_METADATA_KEY, vectorMeta);
}
+
+ this.effectiveHoodieSchema = effectiveSchema;
+ this.effectiveAvroSchema = effectiveSchema.toAvroSchema();
+ this.variantWriteShreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ // Identify variant fields that need shredding
+ List<Integer> variantIndices = new ArrayList<>();
+ List<Schema> variantAvroSchemas = new ArrayList<>();
+ List<HoodieSchema.Variant> variantHoodieSchemas = new ArrayList<>();
+
+ if (variantWriteShreddingEnabled && effectiveSchema.getType() ==
HoodieSchemaType.RECORD) {
+ List<HoodieSchemaField> fields = effectiveSchema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ HoodieSchemaField field = fields.get(i);
+ HoodieSchema fieldSchema = field.schema();
+ // Unwrap nullable union to get the underlying type
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullType();
+ }
+ if (fieldSchema.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variant = (HoodieSchema.Variant) fieldSchema;
+ if (variant.isShredded() &&
variant.getTypedValueField().isPresent()) {
+ variantIndices.add(i);
+ // Get the Avro sub-schema for this variant field from the
effective schema
+ Schema fieldAvroSchema =
effectiveAvroSchema.getFields().get(i).schema();
+ // Unwrap nullable union
+ if (fieldAvroSchema.getType() == Schema.Type.UNION) {
+ fieldAvroSchema = getNonNullFromUnion(fieldAvroSchema);
+ }
+ variantAvroSchemas.add(fieldAvroSchema);
+ variantHoodieSchemas.add(variant);
+ }
+ }
+ }
+ }
+
+ this.shreddedVariantFieldIndices =
variantIndices.stream().mapToInt(Integer::intValue).toArray();
+ this.shreddedVariantAvroSchemas = variantAvroSchemas.toArray(new
Schema[0]);
+ this.shreddedVariantHoodieSchemas = variantHoodieSchemas.toArray(new
HoodieSchema.Variant[0]);
+
+ // Load shredding provider via reflection if needed
+ if (shreddedVariantFieldIndices.length > 0) {
+ String providerClass =
properties.getProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+ if (providerClass == null || providerClass.isEmpty()) {
+ throw new HoodieException("Variant write shredding is enabled and the
write schema requires shredding "
+ + "(typed_value columns present), but no VariantShreddingProvider
is configured or available on the "
+ + "classpath. Set " +
PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key() + " or add a provider "
+ + "implementation (e.g. the Spark variant module) to the
classpath.");
+ }
+ this.shreddingProvider = (VariantShreddingProvider)
ReflectionUtils.loadClass(providerClass);
+ } else {
+ this.shreddingProvider = null;
+ }
+ }
+
+ /**
+ * Generates the effective schema for writing, applying variant shredding
configuration.
+ *
+ * <p>When shredding is disabled, shredded variant fields are replaced with
unshredded
+ * variants (removing {@code typed_value}) so that the Parquet file does not
contain
+ * unused typed_value columns.</p>
+ *
+ * <p>When shredding is enabled and a forced shredding schema is configured
via
+ * {@link
HoodieStorageConfig#PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST},
+ * all variant fields are replaced with shredded variants using the forced
schema.
+ * This handles the case where the input schema is unshredded but shredding
is desired.</p>
+ *
+ * <p>When shredding is enabled without a forced schema, the schema is
returned as-is
+ * (already-shredded variants stay shredded, unshredded variants stay
unshredded).</p>
+ *
+ * @param hoodieSchema the original HoodieSchema
+ * @param properties writer properties containing shredding configuration
+ * @return the effective schema to use for writing
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, Properties properties) {
+ boolean shreddingEnabled = Boolean.parseBoolean(
+ properties.getProperty(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.key(),
+
String.valueOf(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED.defaultValue())));
+
+ if (!shreddingEnabled) {
+ // Schemas from clustering/compaction may still be shredded (read from
on-disk Parquet files
+ // written with shredding enabled), so we need to strip typed_value when
shredding
+ // is disabled.
+ return unshreddVariantFields(hoodieSchema);
+ }
+
+ // Check if a forced shredding schema is configured
+ String forceShreddingSchema = properties.getProperty(
+ PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST.key());
+ if (forceShreddingSchema != null && !forceShreddingSchema.isEmpty()) {
+ return applyForcedShreddingSchema(hoodieSchema, forceShreddingSchema);
+ }
+
+ // When enabled without forced schema, use the schema as-is
+ // (shredded variants stay shredded, unshredded variants stay unshredded)
+ return hoodieSchema;
+ }
+
+ /**
+ * Overloaded version accepting HoodieConfig for use by factories.
+ */
+ public static HoodieSchema generateEffectiveSchema(HoodieSchema
hoodieSchema, HoodieConfig config) {
+ return generateEffectiveSchema(hoodieSchema, config.getProps());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(T record) {
+ if (shreddedVariantFieldIndices.length > 0 && shreddingProvider != null) {
+ IndexedRecord inputRecord = (IndexedRecord) record;
+ GenericRecord shreddedRecord = new
GenericData.Record(effectiveAvroSchema);
+
+ // Copy all fields, transforming variant fields that need shredding
+ List<Schema.Field> effectiveFields = effectiveAvroSchema.getFields();
+ Schema inputSchema = inputRecord.getSchema();
+
+ for (int i = 0; i < effectiveFields.size(); i++) {
+ Schema.Field effectiveField = effectiveFields.get(i);
+ String fieldName = effectiveField.name();
+ Schema.Field inputField = inputSchema.getField(fieldName);
+ if (inputField == null) {
+ continue;
+ }
+
+ int variantIdx = findVariantIndex(i);
+ if (variantIdx >= 0) {
+ // This is a shredded variant field - transform it
+ Object fieldValue = inputRecord.get(inputField.pos());
+ if (fieldValue instanceof GenericRecord) {
+ GenericRecord variantRecord = (GenericRecord) fieldValue;
+ GenericRecord shreddedVariant =
shreddingProvider.shredVariantRecord(
+ variantRecord,
+ shreddedVariantAvroSchemas[variantIdx],
+ shreddedVariantHoodieSchemas[variantIdx]);
+ shreddedRecord.put(i, shreddedVariant);
Review Comment:
Good catch. This is the read-then-reshred path: compaction/clustering over a
base file written with shredding. There is no reader-side step that
reconstructs the unshredded `{metadata, value}` form, so an already-shredded
record reaches the writer and `shredVariantRecord` returns null and drops it.
Same root cause as the symmetric shredding-disabled case:
https://github.com/apache/hudi/pull/18065#discussion_r3368472566
For now I added a fail-fast guard in `write()`
(`assertInputNotAlreadyShredded`) that throws on an already-shredded input
variant instead of silently losing data. The real fix (reconstruct on read,
then re-shred) is tracked in #18931.
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java:
##########
Review Comment:
Confirmed, and this is the symmetric counterpart of the shredding-enabled
thread: https://github.com/apache/hudi/pull/18065#discussion_r3368472554
Same root cause, no reader-side reconstruction of the unshredded variant.
With shredding disabled the unshredded writer schema makes `value` REQUIRED, so
an already-shredded record (value null) fails at write.
Both are now guarded by the same fail-fast check in `write()`
(`assertInputNotAlreadyShredded`), and the real fix is tracked in #18931.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]