hudi-agent commented on code in PR #18961: URL: https://github.com/apache/hudi/pull/18961#discussion_r3408830015
########## hudi-common/src/main/java/org/apache/hudi/io/storage/VariantShreddingInferenceFileWriter.java: ########## @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.avro.VariantShreddingSchemaInferrer; +import org.apache.hudi.avro.VariantShreddingSchemaInferrer.VariantSample; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.SizeEstimator; +import org.apache.hudi.exception.HoodieIOException; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A {@link HoodieFileWriter} decorator that infers a per-file variant shredding schema from the + * first records before opening the real parquet writer. + * + * <p>Records are buffered (and their variant binaries sampled) until a threshold is reached or + * the writer closes, the sampled binaries are fed to a {@link VariantShreddingSchemaInferrer}, + * the real writer is created against the schema with the inferred typed_value spliced in, and + * the buffer is replayed in arrival order. Replay reproduces each call exactly (write vs + * writeWithMetadata), so commit seqnos, bloom filters and min/max record keys come out + * identical to a non-buffered write. Buffering thresholds mirror Spark's + * {@code ParquetOutputWriterWithVariantShredding} (4096 rows / 64MB). + * + * <p>Buffered records are {@link HoodieRecord#copy() copied} because Spark iterators reuse row + * instances. For record types where copy() is identity (Avro), replay additionally relies on + * writer-level records being freshly allocated per record, which holds today; variant samples + * are extracted eagerly into immutable byte arrays so inference itself never depends on it. + * + * <p>Inference failures never fail the write: the file falls back to unshredded variants. This + * deliberately diverges from Spark (which propagates inference failures) because a throwing + * inference would fail compaction. Writer-creation or replay failures, however, are latched and + * rethrown from every subsequent call including {@link #close()}, so a task cannot silently + * drop buffered records that the handle already counted as written. + * + * <p>Single-threaded by contract, same as the writers it wraps. + * + * <p>See https://github.com/apache/hudi/issues/18937.</p> + */ +@Slf4j +public class VariantShreddingInferenceFileWriter implements HoodieFileWriter { + + /** Buffer caps mirroring Spark's ParquetOutputWriterWithVariantShredding. */ + public static final int MAX_BUFFERED_RECORDS = 4096; + public static final long MAX_BUFFERED_BYTES = 64L * 1024 * 1024; + private static final int SIZE_ESTIMATE_INTERVAL = 100; + + /** + * Extracts the variant binaries of the inferable columns from a record. Bound to the writer + * schema and column set by the creating factory; must defensively copy the bytes. + */ + @FunctionalInterface + public interface VariantSampleExtractor { + VariantSample[] extract(HoodieRecord record, HoodieSchema schema, Properties props) throws IOException; + } + + /** Creates the real file writer once the inferred typed_value schemas are known. */ + @FunctionalInterface + public interface InferredWriterFactory { + HoodieFileWriter create(Map<String, HoodieSchema> inferredTypedValues) throws IOException; + } + + private final List<String> variantColumns; + private final VariantSampleExtractor extractor; + private final VariantShreddingSchemaInferrer inferrer; + private final InferredWriterFactory writerFactory; + private final long maxBufferedBytes; + private final SizeEstimator<HoodieRecord> sizeEstimator = new DefaultSizeEstimator<>(); + + private final List<BufferedWrite> buffer = new ArrayList<>(); + private final List<VariantSample[]> samples = new ArrayList<>(); + private long estimatedRecordSize = 0; + private long bufferedBytes = 0; + private HoodieFileWriter delegate; + private IOException fatalFailure; + private boolean closed = false; + + public VariantShreddingInferenceFileWriter(List<String> variantColumns, + VariantSampleExtractor extractor, + VariantShreddingSchemaInferrer inferrer, + InferredWriterFactory writerFactory, + long maxFileSize) { + this.variantColumns = variantColumns; + this.extractor = extractor; + this.inferrer = inferrer; + this.writerFactory = writerFactory; + this.maxBufferedBytes = Math.min(MAX_BUFFERED_BYTES, Math.max(1, maxFileSize)); + } + + @Override + public boolean canWrite() { + // Nothing has been physically written while buffering, so size-based rollover cannot apply yet. + return delegate == null || delegate.canWrite(); + } + + @Override + public void writeWithMetadata(HoodieKey key, HoodieRecord record, HoodieSchema schema, Properties props) throws IOException { + rethrowIfFailed(); + if (delegate != null) { + delegate.writeWithMetadata(key, record, schema, props); + } else { + buffer(true, key, null, record, schema, props); + } + } + + @Override + public void write(String recordKey, HoodieRecord record, HoodieSchema schema, Properties props) throws IOException { Review Comment: ๐ค Minor: in the success path the try block already calls `delegate.close()`, then if that throws the catch block calls `delegate.close()` a second time. Most parquet writers' close() is idempotent so this is benign, but tracking a `delegateClosed` flag would avoid relying on that contract. Same pattern in VariantShreddingInferenceInternalRowFileWriter.close(). <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/VariantReconstruction.java: ########## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage.hadoop; + +import org.apache.hudi.avro.VariantSchemaUtils; +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.avro.VariantShreddingRuntime; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.storage.HoodieStorage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reconstructs unshredded variants when reading an already-shredded base file on the Avro + * ({@code HoodieRecordType.AVRO}) read path. + * + * <p>parquet-avro does not understand variant shredding, so a shredded variant column comes back as + * a raw {@code {metadata, value, typed_value}} record. This reads such columns at their shredded + * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code {metadata, value}} + * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before records reach the + * merger/writer. The Spark/InternalRow read path reconstructs natively and does not use this. + * + * <p>See https://github.com/apache/hudi/issues/18931. + */ +final class VariantReconstruction { + + private static final Logger LOG = LoggerFactory.getLogger(VariantReconstruction.class); + + private final HoodieSchema intermediateSchema; + private final Schema outputAvroSchema; + private final VariantShreddingProvider provider; + // Indexed by field position in the (requested == output) record. For target fields, the file's + // shredded sub-schema and the unshredded target sub-schema for rebuild; null for non-targets. + private final boolean[] isTarget; + private final Schema[] shreddedSubSchemas; + private final Schema[] unshreddedSubSchemas; + + private VariantReconstruction(HoodieSchema intermediateSchema, Schema outputAvroSchema, + VariantShreddingProvider provider, boolean[] isTarget, + Schema[] shreddedSubSchemas, Schema[] unshreddedSubSchemas) { + this.intermediateSchema = intermediateSchema; + this.outputAvroSchema = outputAvroSchema; + this.provider = provider; + this.isTarget = isTarget; + this.shreddedSubSchemas = shreddedSubSchemas; + this.unshreddedSubSchemas = unshreddedSubSchemas; + } + + /** + * Schema to read the parquet file with: the requested schema, but with shredded variant columns + * swapped to their file (typed_value-bearing) form so parquet-avro materializes {@code typed_value}. + */ + HoodieSchema intermediateSchema() { + return intermediateSchema; + } + + /** + * Builds a reconstruction for the given file and requested schemas, or returns {@code null} when + * none is needed (no shredded variant columns in the file, reading shredded variants disabled, or + * no provider available - in which case the read proceeds unchanged). + */ + static VariantReconstruction create(HoodieSchema fileSchema, HoodieSchema requestedSchema, HoodieStorage storage) { + if (requestedSchema.getType() != HoodieSchemaType.RECORD || fileSchema.getType() != HoodieSchemaType.RECORD) { + return null; + } + if (!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(), + HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) { + return null; + } + + List<HoodieSchemaField> requestedFields = requestedSchema.getFields(); + List<HoodieSchemaField> intermediateFields = new ArrayList<>(); + boolean[] isTarget = new boolean[requestedFields.size()]; + boolean anyTarget = false; + for (int i = 0; i < requestedFields.size(); i++) { + HoodieSchemaField requestedField = requestedFields.get(i); + Option<HoodieSchemaField> fileField = fileSchema.getField(requestedField.name()); + if (fileField.isPresent() && isShreddedVariantTarget(requestedField.schema(), fileField.get().schema())) { + isTarget[i] = true; + anyTarget = true; + // Read this column in its on-disk shredded shape. createNewSchemaField: the requested + // schema's avro fields are position-attached and cannot be reused in a new record. + intermediateFields.add(HoodieSchemaUtils.createNewSchemaField(requestedField.withSchema(fileField.get().schema()))); + } else { + intermediateFields.add(HoodieSchemaUtils.createNewSchemaField(requestedField)); + } + } + if (!anyTarget) { + return null; + } + + VariantShreddingProvider provider = loadProvider(storage); + if (provider == null) { + LOG.warn("Base file has shredded variant column(s) but no VariantShreddingProvider is available; " + + "variants will not be reconstructed. Set {} or add a provider implementation to the classpath.", + HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()); + return null; + } + + HoodieSchema intermediateSchema = HoodieSchema.createRecord( + requestedSchema.getAvroSchema().getName(), + requestedSchema.getAvroSchema().getNamespace(), + requestedSchema.getAvroSchema().getDoc(), + intermediateFields); + // Records leave this reader unshredded; output field order matches the requested/intermediate order. + HoodieSchema outputSchema = VariantSchemaUtils.stripVariantShredding(requestedSchema); + + Schema[] shreddedSubSchemas = new Schema[requestedFields.size()]; + Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()]; + for (int i = 0; i < requestedFields.size(); i++) { + if (isTarget[i]) { + shreddedSubSchemas[i] = nonNull(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema(); + unshreddedSubSchemas[i] = nonNull(outputSchema.getFields().get(i).schema()).getAvroSchema(); + } + } + + return new VariantReconstruction(intermediateSchema, outputSchema.toAvroSchema(), provider, + isTarget, shreddedSubSchemas, unshreddedSubSchemas); + } + + /** + * Rebuilds shredded variant columns of {@code in} (read in the intermediate shredded shape) into + * a record conforming to the unshredded output schema. + */ + IndexedRecord reconstruct(IndexedRecord in) { + GenericRecord out = new GenericData.Record(outputAvroSchema); + for (int i = 0; i < isTarget.length; i++) { + Object value = in.get(i); + if (isTarget[i] && value instanceof GenericRecord) { + out.put(i, provider.rebuildVariantRecord((GenericRecord) value, shreddedSubSchemas[i], unshreddedSubSchemas[i])); + } else { + // Non-variant column, or a null variant column: pass through unchanged. + out.put(i, value); + } + } + return out; + } + + /** + * Whether this column must be read in its on-disk shredded shape and reconstructed. The file + * schema comes from converting the parquet footer MessageType, which loses the variant + * logical type (variant groups come back as plain records), so the on-disk side is detected + * by SHAPE, anchored by the requested side: the requested column (from the table schema, + * logical type intact) must be a variant for the shape match to count. + */ + private static boolean isShreddedVariantTarget(HoodieSchema requestedFieldSchema, HoodieSchema fileFieldSchema) { + HoodieSchema file = nonNull(fileFieldSchema); + if (file.getType() == HoodieSchemaType.VARIANT && ((HoodieSchema.Variant) file).isShredded()) { + return true; + } + HoodieSchema requested = nonNull(requestedFieldSchema); + return requested.getType() == HoodieSchemaType.VARIANT + && VariantSchemaUtils.isShreddedVariantShape(file); + } + + private static HoodieSchema nonNull(HoodieSchema schema) { Review Comment: ๐ค nit: `nonNull(schema)` reads like a null guard (similar to `Objects.requireNonNull` or `Objects.nonNull`) but it actually unwraps a nullable Avro union type โ a very different operation. Something like `unwrapNullable` or `unwrapIfNullable` would make the intent immediately clear at the call sites on lines 178 and 182. <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java: ########## @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.variant; + +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.types.variant.ShreddingUtils; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantSchema; +import org.apache.spark.types.variant.VariantShreddingWriter; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Implementation of {@link VariantShreddingProvider} using Spark 4's variant parsing library. + * + * <p>This class bridges the Avro record path and Spark's {@link VariantShreddingWriter} + * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It converts + * the shredded output into Avro {@link GenericRecord}s that can be written via + * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p> + * + * <p>The shredding logic is delegated to {@link VariantShreddingWriter#castShredded}, + * which handles scalar, object, and array shredding including residual value construction + * for non-matching fields. This class implements the {@link ShreddedResult} and + * {@link ShreddedResultBuilder} interfaces to collect the shredded components into + * Avro GenericRecords.</p> + */ +public class Spark4VariantShreddingProvider implements VariantShreddingProvider { + + private static final String VALUE_FIELD = "value"; + private static final String METADATA_FIELD = "metadata"; + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Override + public GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema) { + + ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD); + ByteBuffer metadataBuf = (ByteBuffer) unshreddedVariant.get(METADATA_FIELD); + + if (valueBuf == null || metadataBuf == null) { + return null; + } + + byte[] valueBytes = toByteArray(valueBuf); + byte[] metadataBytes = toByteArray(metadataBuf); + + Variant variant = new Variant(valueBytes, metadataBytes); + + // Build VariantSchema from the Avro shredded schema, registering + // Avro schemas at each level for GenericRecord construction. + AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder(); + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, builder); + + // Delegate to Spark's VariantShreddingWriter for the actual shredding logic. + AvroShreddedResult result = (AvroShreddedResult) + VariantShreddingWriter.castShredded(variant, sparkSchema, builder); + + return result.toGenericRecord(); + } + + @Override + public GenericRecord rebuildVariantRecord( + GenericRecord shreddedVariant, + Schema shreddedSchema, + Schema unshreddedSchema) { + + if (shreddedVariant == null) { + return null; + } + ByteBuffer metadataBuf = (ByteBuffer) shreddedVariant.get(METADATA_FIELD); + if (metadataBuf == null) { + return null; + } + + // Reuse the same VariantSchema index assignment as the write path (no builder needed on read). + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, null); + + // Delegate to Spark's reconstruction algorithm (inverse of castShredded). + Variant variant = ShreddingUtils.rebuild(new AvroVariantRow(shreddedVariant, sparkSchema), sparkSchema); + + GenericRecord out = new GenericData.Record(unshreddedSchema); + out.put(METADATA_FIELD, ByteBuffer.wrap(variant.getMetadata())); + out.put(VALUE_FIELD, ByteBuffer.wrap(variant.getValue())); + return out; + } + + /** + * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a + * shredded variant structure ({@code value}, {@code metadata}, {@code typed_value}). + * + * <p>This method also registers the Avro schema mapping in the builder so that + * {@link AvroShreddedResultBuilder#createEmpty} can create results with the + * correct Avro schema at each nesting level.</p> + */ + private VariantSchema buildVariantSchema(Schema avroSchema, boolean isTopLevel, + AvroShreddedResultBuilder builder) { + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + Schema.Field metadataField = avroSchema.getField(METADATA_FIELD); + Schema.Field typedValueField = avroSchema.getField(TYPED_VALUE_FIELD); + + int idx = 0; + int variantIdx = valueField != null ? idx++ : -1; + int topLevelMetadataIdx; + if (metadataField != null && isTopLevel) { + topLevelMetadataIdx = idx++; + } else { + topLevelMetadataIdx = -1; + if (metadataField != null) { + idx++; + } + } + int typedIdx = typedValueField != null ? idx++ : -1; + int numFields = idx; + + VariantSchema.ScalarType scalarSchema = null; + VariantSchema.ObjectField[] objectSchema = null; + VariantSchema arraySchema = null; + + if (typedValueField != null) { + Schema tvSchema = unwrapNullable(typedValueField.schema()); + + switch (tvSchema.getType()) { + case RECORD: + // Object shredding: each field has a nested {value, typed_value} sub-struct + List<VariantSchema.ObjectField> fields = new ArrayList<>(); + for (Schema.Field field : tvSchema.getFields()) { + Schema fieldSchema = unwrapNullable(field.schema()); + VariantSchema subSchema = buildVariantSchema(fieldSchema, false, builder); + fields.add(new VariantSchema.ObjectField(field.name(), subSchema)); + } + objectSchema = fields.toArray(new VariantSchema.ObjectField[0]); + break; + + case ARRAY: + // Array shredding: elements follow the shredding schema + Schema elementSchema = unwrapNullable(tvSchema.getElementType()); + arraySchema = buildVariantSchema(elementSchema, false, builder); + break; + + default: + // Scalar shredding + scalarSchema = avroTypeToScalarType(tvSchema); + break; + } + } + + VariantSchema result = new VariantSchema( + typedIdx, variantIdx, topLevelMetadataIdx, numFields, + scalarSchema, objectSchema, arraySchema); + + // The read (rebuild) path passes a null builder: it needs the VariantSchema indices but no + // Avro-schema registration (registration only feeds write-side result construction). + if (builder != null) { + builder.registerSchema(result, avroSchema); + } + + return result; + } + + /** + * Maps an Avro {@link Schema} type (potentially with logical type annotations) + * to a {@link VariantSchema.ScalarType}. + */ + private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + + // Check logical types first + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return new VariantSchema.DecimalType(decimal.getPrecision(), decimal.getScale()); + } + String name = logicalType.getName(); + if ("date".equals(name)) { + return new VariantSchema.DateType(); + } + if ("timestamp-micros".equals(name)) { + return new VariantSchema.TimestampType(); + } Review Comment: ๐ค Mapping both `timestamp-millis` and `timestamp-micros` (and similarly local-* variants) to the same `VariantSchema.TimestampType()`/`TimestampNTZType()` looks suspicious โ if Spark's `VariantSchema.TimestampType` has fixed micros precision (variant binary format stores ยตs), then a millis Long would be interpreted as ยตs and be off by 1000x. In the inferred-write flow this branch is dead (Hudi always generates `timestamp-micros` via `HoodieSchema.createTimestampMicros()`), but the read/rebuild path could in principle hit an Avro schema with millis. Is this branch intended as a true equivalence, or should millis schemas decline shredding (return null) so rebuild doesn't silently misinterpret values? <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java: ########## @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.variant; + +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.types.variant.ShreddingUtils; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantSchema; +import org.apache.spark.types.variant.VariantShreddingWriter; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Implementation of {@link VariantShreddingProvider} using Spark 4's variant parsing library. + * + * <p>This class bridges the Avro record path and Spark's {@link VariantShreddingWriter} + * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It converts + * the shredded output into Avro {@link GenericRecord}s that can be written via + * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p> + * + * <p>The shredding logic is delegated to {@link VariantShreddingWriter#castShredded}, + * which handles scalar, object, and array shredding including residual value construction + * for non-matching fields. This class implements the {@link ShreddedResult} and + * {@link ShreddedResultBuilder} interfaces to collect the shredded components into + * Avro GenericRecords.</p> + */ +public class Spark4VariantShreddingProvider implements VariantShreddingProvider { + + private static final String VALUE_FIELD = "value"; + private static final String METADATA_FIELD = "metadata"; + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Override + public GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema) { + + ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD); + ByteBuffer metadataBuf = (ByteBuffer) unshreddedVariant.get(METADATA_FIELD); + + if (valueBuf == null || metadataBuf == null) { + return null; + } + + byte[] valueBytes = toByteArray(valueBuf); + byte[] metadataBytes = toByteArray(metadataBuf); + + Variant variant = new Variant(valueBytes, metadataBytes); + + // Build VariantSchema from the Avro shredded schema, registering + // Avro schemas at each level for GenericRecord construction. + AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder(); + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, builder); + + // Delegate to Spark's VariantShreddingWriter for the actual shredding logic. + AvroShreddedResult result = (AvroShreddedResult) + VariantShreddingWriter.castShredded(variant, sparkSchema, builder); + + return result.toGenericRecord(); + } + + @Override + public GenericRecord rebuildVariantRecord( + GenericRecord shreddedVariant, + Schema shreddedSchema, + Schema unshreddedSchema) { + + if (shreddedVariant == null) { + return null; + } + ByteBuffer metadataBuf = (ByteBuffer) shreddedVariant.get(METADATA_FIELD); + if (metadataBuf == null) { + return null; + } + + // Reuse the same VariantSchema index assignment as the write path (no builder needed on read). + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, null); + + // Delegate to Spark's reconstruction algorithm (inverse of castShredded). + Variant variant = ShreddingUtils.rebuild(new AvroVariantRow(shreddedVariant, sparkSchema), sparkSchema); + + GenericRecord out = new GenericData.Record(unshreddedSchema); + out.put(METADATA_FIELD, ByteBuffer.wrap(variant.getMetadata())); + out.put(VALUE_FIELD, ByteBuffer.wrap(variant.getValue())); + return out; + } + + /** + * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a + * shredded variant structure ({@code value}, {@code metadata}, {@code typed_value}). + * + * <p>This method also registers the Avro schema mapping in the builder so that + * {@link AvroShreddedResultBuilder#createEmpty} can create results with the + * correct Avro schema at each nesting level.</p> + */ + private VariantSchema buildVariantSchema(Schema avroSchema, boolean isTopLevel, + AvroShreddedResultBuilder builder) { + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + Schema.Field metadataField = avroSchema.getField(METADATA_FIELD); + Schema.Field typedValueField = avroSchema.getField(TYPED_VALUE_FIELD); + + int idx = 0; + int variantIdx = valueField != null ? idx++ : -1; + int topLevelMetadataIdx; + if (metadataField != null && isTopLevel) { + topLevelMetadataIdx = idx++; + } else { + topLevelMetadataIdx = -1; + if (metadataField != null) { + idx++; + } + } + int typedIdx = typedValueField != null ? idx++ : -1; + int numFields = idx; + + VariantSchema.ScalarType scalarSchema = null; + VariantSchema.ObjectField[] objectSchema = null; + VariantSchema arraySchema = null; + + if (typedValueField != null) { + Schema tvSchema = unwrapNullable(typedValueField.schema()); + + switch (tvSchema.getType()) { + case RECORD: + // Object shredding: each field has a nested {value, typed_value} sub-struct + List<VariantSchema.ObjectField> fields = new ArrayList<>(); + for (Schema.Field field : tvSchema.getFields()) { + Schema fieldSchema = unwrapNullable(field.schema()); + VariantSchema subSchema = buildVariantSchema(fieldSchema, false, builder); + fields.add(new VariantSchema.ObjectField(field.name(), subSchema)); + } + objectSchema = fields.toArray(new VariantSchema.ObjectField[0]); + break; + + case ARRAY: + // Array shredding: elements follow the shredding schema + Schema elementSchema = unwrapNullable(tvSchema.getElementType()); + arraySchema = buildVariantSchema(elementSchema, false, builder); + break; + + default: + // Scalar shredding + scalarSchema = avroTypeToScalarType(tvSchema); + break; + } + } + + VariantSchema result = new VariantSchema( + typedIdx, variantIdx, topLevelMetadataIdx, numFields, + scalarSchema, objectSchema, arraySchema); + + // The read (rebuild) path passes a null builder: it needs the VariantSchema indices but no + // Avro-schema registration (registration only feeds write-side result construction). + if (builder != null) { + builder.registerSchema(result, avroSchema); + } + + return result; + } + + /** + * Maps an Avro {@link Schema} type (potentially with logical type annotations) + * to a {@link VariantSchema.ScalarType}. + */ + private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + + // Check logical types first + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return new VariantSchema.DecimalType(decimal.getPrecision(), decimal.getScale()); + } + String name = logicalType.getName(); + if ("date".equals(name)) { + return new VariantSchema.DateType(); + } + if ("timestamp-micros".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-micros".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("timestamp-millis".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-millis".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("uuid".equals(name)) { + return new VariantSchema.UuidType(); + } + } + + switch (schema.getType()) { + case BOOLEAN: + return new VariantSchema.BooleanType(); + case INT: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT); + case LONG: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG); + case FLOAT: + return new VariantSchema.FloatType(); + case DOUBLE: + return new VariantSchema.DoubleType(); + case STRING: + return new VariantSchema.StringType(); + case BYTES: + return new VariantSchema.BinaryType(); + case FIXED: + return new VariantSchema.BinaryType(); + default: + return null; + } + } + + private static Schema unwrapNullable(Schema schema) { + if (schema.getType() == Schema.Type.UNION) { Review Comment: ๐ค nit: `BYTES` and `FIXED` both produce `new VariantSchema.BinaryType()` โ could you collapse them into a single fall-through case (`case BYTES: case FIXED: return new VariantSchema.BinaryType();`) to avoid the duplication? <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java: ########## @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.variant; + +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.schema.HoodieSchema; + +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.types.variant.ShreddingUtils; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.types.variant.VariantSchema; +import org.apache.spark.types.variant.VariantShreddingWriter; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResult; +import org.apache.spark.types.variant.VariantShreddingWriter.ShreddedResultBuilder; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Implementation of {@link VariantShreddingProvider} using Spark 4's variant parsing library. + * + * <p>This class bridges the Avro record path and Spark's {@link VariantShreddingWriter} + * to allow {@code HoodieRecordType.AVRO} to write shredded variant types. It converts + * the shredded output into Avro {@link GenericRecord}s that can be written via + * {@link org.apache.hudi.avro.HoodieAvroWriteSupport}.</p> + * + * <p>The shredding logic is delegated to {@link VariantShreddingWriter#castShredded}, + * which handles scalar, object, and array shredding including residual value construction + * for non-matching fields. This class implements the {@link ShreddedResult} and + * {@link ShreddedResultBuilder} interfaces to collect the shredded components into + * Avro GenericRecords.</p> + */ +public class Spark4VariantShreddingProvider implements VariantShreddingProvider { + + private static final String VALUE_FIELD = "value"; + private static final String METADATA_FIELD = "metadata"; + private static final String TYPED_VALUE_FIELD = "typed_value"; + + @Override + public GenericRecord shredVariantRecord( + GenericRecord unshreddedVariant, + Schema shreddedSchema, + HoodieSchema.Variant variantSchema) { + + ByteBuffer valueBuf = (ByteBuffer) unshreddedVariant.get(VALUE_FIELD); + ByteBuffer metadataBuf = (ByteBuffer) unshreddedVariant.get(METADATA_FIELD); + + if (valueBuf == null || metadataBuf == null) { + return null; + } + + byte[] valueBytes = toByteArray(valueBuf); + byte[] metadataBytes = toByteArray(metadataBuf); + + Variant variant = new Variant(valueBytes, metadataBytes); + + // Build VariantSchema from the Avro shredded schema, registering + // Avro schemas at each level for GenericRecord construction. + AvroShreddedResultBuilder builder = new AvroShreddedResultBuilder(); + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, builder); + + // Delegate to Spark's VariantShreddingWriter for the actual shredding logic. + AvroShreddedResult result = (AvroShreddedResult) + VariantShreddingWriter.castShredded(variant, sparkSchema, builder); + + return result.toGenericRecord(); + } + + @Override + public GenericRecord rebuildVariantRecord( + GenericRecord shreddedVariant, + Schema shreddedSchema, + Schema unshreddedSchema) { + + if (shreddedVariant == null) { + return null; + } + ByteBuffer metadataBuf = (ByteBuffer) shreddedVariant.get(METADATA_FIELD); + if (metadataBuf == null) { + return null; + } + + // Reuse the same VariantSchema index assignment as the write path (no builder needed on read). + VariantSchema sparkSchema = buildVariantSchema(shreddedSchema, true, null); + + // Delegate to Spark's reconstruction algorithm (inverse of castShredded). + Variant variant = ShreddingUtils.rebuild(new AvroVariantRow(shreddedVariant, sparkSchema), sparkSchema); + + GenericRecord out = new GenericData.Record(unshreddedSchema); + out.put(METADATA_FIELD, ByteBuffer.wrap(variant.getMetadata())); + out.put(VALUE_FIELD, ByteBuffer.wrap(variant.getValue())); + return out; + } + + /** + * Builds a {@link VariantSchema} from an Avro {@link Schema} representing a + * shredded variant structure ({@code value}, {@code metadata}, {@code typed_value}). + * + * <p>This method also registers the Avro schema mapping in the builder so that + * {@link AvroShreddedResultBuilder#createEmpty} can create results with the + * correct Avro schema at each nesting level.</p> + */ + private VariantSchema buildVariantSchema(Schema avroSchema, boolean isTopLevel, + AvroShreddedResultBuilder builder) { + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + Schema.Field metadataField = avroSchema.getField(METADATA_FIELD); + Schema.Field typedValueField = avroSchema.getField(TYPED_VALUE_FIELD); + + int idx = 0; + int variantIdx = valueField != null ? idx++ : -1; + int topLevelMetadataIdx; + if (metadataField != null && isTopLevel) { + topLevelMetadataIdx = idx++; + } else { + topLevelMetadataIdx = -1; + if (metadataField != null) { + idx++; + } + } + int typedIdx = typedValueField != null ? idx++ : -1; + int numFields = idx; + + VariantSchema.ScalarType scalarSchema = null; + VariantSchema.ObjectField[] objectSchema = null; + VariantSchema arraySchema = null; + + if (typedValueField != null) { + Schema tvSchema = unwrapNullable(typedValueField.schema()); + + switch (tvSchema.getType()) { + case RECORD: + // Object shredding: each field has a nested {value, typed_value} sub-struct + List<VariantSchema.ObjectField> fields = new ArrayList<>(); + for (Schema.Field field : tvSchema.getFields()) { + Schema fieldSchema = unwrapNullable(field.schema()); + VariantSchema subSchema = buildVariantSchema(fieldSchema, false, builder); + fields.add(new VariantSchema.ObjectField(field.name(), subSchema)); + } + objectSchema = fields.toArray(new VariantSchema.ObjectField[0]); + break; + + case ARRAY: + // Array shredding: elements follow the shredding schema + Schema elementSchema = unwrapNullable(tvSchema.getElementType()); + arraySchema = buildVariantSchema(elementSchema, false, builder); + break; + + default: + // Scalar shredding + scalarSchema = avroTypeToScalarType(tvSchema); + break; + } + } + + VariantSchema result = new VariantSchema( + typedIdx, variantIdx, topLevelMetadataIdx, numFields, + scalarSchema, objectSchema, arraySchema); + + // The read (rebuild) path passes a null builder: it needs the VariantSchema indices but no + // Avro-schema registration (registration only feeds write-side result construction). + if (builder != null) { + builder.registerSchema(result, avroSchema); + } + + return result; + } + + /** + * Maps an Avro {@link Schema} type (potentially with logical type annotations) + * to a {@link VariantSchema.ScalarType}. + */ + private VariantSchema.ScalarType avroTypeToScalarType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + + // Check logical types first + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return new VariantSchema.DecimalType(decimal.getPrecision(), decimal.getScale()); + } + String name = logicalType.getName(); + if ("date".equals(name)) { + return new VariantSchema.DateType(); + } + if ("timestamp-micros".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-micros".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("timestamp-millis".equals(name)) { + return new VariantSchema.TimestampType(); + } + if ("local-timestamp-millis".equals(name)) { + return new VariantSchema.TimestampNTZType(); + } + if ("uuid".equals(name)) { + return new VariantSchema.UuidType(); + } + } + + switch (schema.getType()) { + case BOOLEAN: + return new VariantSchema.BooleanType(); + case INT: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT); + case LONG: + return new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG); + case FLOAT: + return new VariantSchema.FloatType(); + case DOUBLE: + return new VariantSchema.DoubleType(); + case STRING: + return new VariantSchema.StringType(); + case BYTES: + return new VariantSchema.BinaryType(); + case FIXED: + return new VariantSchema.BinaryType(); + default: + return null; + } + } + + private static Schema unwrapNullable(Schema schema) { + if (schema.getType() == Schema.Type.UNION) { + for (Schema type : schema.getTypes()) { + if (type.getType() != Schema.Type.NULL) { + return type; + } + } + } + return schema; + } + + private static byte[] toByteArray(ByteBuffer buffer) { + if (buffer.hasArray() && buffer.position() == 0 + && buffer.arrayOffset() == 0 + && buffer.remaining() == buffer.array().length) { + return buffer.array(); + } + byte[] bytes = new byte[buffer.remaining()]; + buffer.duplicate().get(bytes); + return bytes; + } + + /** + * {@link ShreddedResult} implementation that collects shredded variant components + * and converts them into an Avro {@link GenericRecord}. + */ + static class AvroShreddedResult implements ShreddedResult { + private final VariantSchema variantSchema; + private final Schema avroSchema; + + private byte[] metadata; + private byte[] variantValue; + private Object scalarValue; + private AvroShreddedResult[] objectFields; + private AvroShreddedResult[] arrayElements; + + AvroShreddedResult(VariantSchema variantSchema, Schema avroSchema) { + this.variantSchema = variantSchema; + this.avroSchema = avroSchema; + } + + @Override + public void addArray(ShreddedResult[] array) { + this.arrayElements = new AvroShreddedResult[array.length]; + for (int i = 0; i < array.length; i++) { + this.arrayElements[i] = (AvroShreddedResult) array[i]; + } + } + + @Override + public void addObject(ShreddedResult[] values) { + this.objectFields = new AvroShreddedResult[values.length]; + for (int i = 0; i < values.length; i++) { + this.objectFields[i] = (AvroShreddedResult) values[i]; + } + } + + @Override + public void addVariantValue(byte[] result) { + this.variantValue = result; + } + + @Override + public void addScalar(Object result) { + this.scalarValue = result; + } + + @Override + public void addMetadata(byte[] result) { + this.metadata = result; + } + + /** + * Converts the collected shredded components into an Avro {@link GenericRecord}. + */ + GenericRecord toGenericRecord() { + GenericRecord record = new GenericData.Record(avroSchema); + + // Metadata (only present at top level) + if (metadata != null) { + record.put(METADATA_FIELD, ByteBuffer.wrap(metadata)); + } + + // Value (variant binary for non-shredded or residual data) + Schema.Field valueField = avroSchema.getField(VALUE_FIELD); + if (valueField != null) { + if (variantValue != null) { + record.put(VALUE_FIELD, ByteBuffer.wrap(variantValue)); + } else { + record.put(VALUE_FIELD, null); + } + } + + // Typed value + Schema.Field tvField = avroSchema.getField(TYPED_VALUE_FIELD); + if (tvField == null) { + return record; + } + + if (scalarValue != null) { + Schema tvSchema = unwrapNullable(tvField.schema()); + record.put(TYPED_VALUE_FIELD, convertScalarToAvro(scalarValue, tvSchema)); + } else if (objectFields != null) { + Schema tvSchema = unwrapNullable(tvField.schema()); + GenericRecord tvRecord = new GenericData.Record(tvSchema); + for (int i = 0; i < objectFields.length; i++) { + String fieldName = variantSchema.objectSchema[i].fieldName; + // Always create the sub-record even for missing fields (non-null struct with null fields) + tvRecord.put(fieldName, objectFields[i].toGenericRecord()); + } + record.put(TYPED_VALUE_FIELD, tvRecord); + } else if (arrayElements != null) { + List<GenericRecord> list = new ArrayList<>(arrayElements.length); + for (AvroShreddedResult element : arrayElements) { + list.add(element.toGenericRecord()); + } + record.put(TYPED_VALUE_FIELD, list); + } else { + record.put(TYPED_VALUE_FIELD, null); + } + + return record; + } + + /** + * Converts a scalar value from Spark's variant representation to an Avro-compatible type. + * Handles type widening (Byte/Short to Int/Long) and binary wrapping. + */ + private static Object convertScalarToAvro(Object value, Schema avroSchema) { + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + if (value instanceof UUID) { + return value.toString(); + } + // Widen integer types to match Avro schema expectations + if (avroSchema.getType() == Schema.Type.INT) { + if (value instanceof Byte) { + return ((Byte) value).intValue(); + } + if (value instanceof Short) { + return ((Short) value).intValue(); + } + } + if (avroSchema.getType() == Schema.Type.LONG) { + if (value instanceof Byte) { + return ((Byte) value).longValue(); + } + if (value instanceof Short) { + return ((Short) value).longValue(); + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + } + // BigDecimal, Boolean, String, Integer, Long, Float, Double + // are directly compatible with Avro's type system + return value; + } + } + + /** + * {@link ShreddedResultBuilder} that creates {@link AvroShreddedResult} instances + * with the corresponding Avro schema at each nesting level. + */ + static class AvroShreddedResultBuilder implements ShreddedResultBuilder { + private final Map<VariantSchema, Schema> schemaMap = new IdentityHashMap<>(); + + void registerSchema(VariantSchema variantSchema, Schema avroSchema) { + schemaMap.put(variantSchema, avroSchema); + } + + @Override + public ShreddedResult createEmpty(VariantSchema schema) { + Schema avroSchema = schemaMap.get(schema); + if (avroSchema == null) { + throw new IllegalStateException( + "No Avro schema registered for VariantSchema: " + schema); + } + return new AvroShreddedResult(schema, avroSchema); + } + + @Override + public boolean allowNumericScaleChanges() { + return true; + } + } + + /** + * Base {@link ShreddingUtils.ShreddedRow} with all accessors throwing; concrete rows override + * only the accessors valid for their nesting context. This is the read-path mirror of the + * write-path {@link AvroShreddedResult}: it reads Avro records to feed Spark's reconstruction + * ({@link ShreddingUtils#rebuild}). + */ + abstract static class BaseAvroShreddedRow implements ShreddingUtils.ShreddedRow { + @Override + public boolean isNullAt(int ordinal) { + throw unsupported(); + } + + @Override + public boolean getBoolean(int ordinal) { + throw unsupported(); + } + + @Override + public byte getByte(int ordinal) { + throw unsupported(); + } + + @Override + public short getShort(int ordinal) { + throw unsupported(); + } + + @Override + public int getInt(int ordinal) { + throw unsupported(); + } + + @Override + public long getLong(int ordinal) { + throw unsupported(); + } + + @Override + public float getFloat(int ordinal) { + throw unsupported(); + } + + @Override + public double getDouble(int ordinal) { + throw unsupported(); + } + + @Override + public BigDecimal getDecimal(int ordinal, int precision, int scale) { + throw unsupported(); + } + + @Override + public String getString(int ordinal) { + throw unsupported(); + } + + @Override + public byte[] getBinary(int ordinal) { + throw unsupported(); + } + + @Override + public UUID getUuid(int ordinal) { + throw unsupported(); + } + + @Override + public ShreddingUtils.ShreddedRow getStruct(int ordinal, int numFields) { + throw unsupported(); + } + + @Override + public ShreddingUtils.ShreddedRow getArray(int ordinal) { + throw unsupported(); + } + + @Override + public int numElements() { + throw unsupported(); + } + + private static UnsupportedOperationException unsupported() { + return new UnsupportedOperationException("Accessor not valid for this shredded row context"); + } + } + + /** + * A shredded variant struct {@code {value, [metadata], typed_value}}. Maps the Spark + * {@link VariantSchema} ordinals (variantIdx / topLevelMetadataIdx / typedIdx) back to the named + * Avro fields, and reads {@code typed_value} for scalar/object/array reconstruction. + */ + static final class AvroVariantRow extends BaseAvroShreddedRow { + private final GenericRecord record; + private final VariantSchema schema; + + AvroVariantRow(GenericRecord record, VariantSchema schema) { + this.record = record; + this.schema = schema; + } + + private String fieldNameFor(int ordinal) { + if (ordinal == schema.typedIdx) { + return TYPED_VALUE_FIELD; + } + if (ordinal == schema.variantIdx) { + return VALUE_FIELD; + } + if (ordinal == schema.topLevelMetadataIdx) { + return METADATA_FIELD; + } + throw new IllegalArgumentException("Unexpected shredded ordinal: " + ordinal); + } + + @Override public boolean isNullAt(int ordinal) { + return record.get(fieldNameFor(ordinal)) == null; + } + + @Override public byte[] getBinary(int ordinal) { + return toByteArray((ByteBuffer) record.get(fieldNameFor(ordinal))); + } + + @Override public boolean getBoolean(int ordinal) { + return (Boolean) record.get(TYPED_VALUE_FIELD); + } + + @Override public byte getByte(int ordinal) { + return ((Number) record.get(TYPED_VALUE_FIELD)).byteValue(); + } + + @Override public short getShort(int ordinal) { + return ((Number) record.get(TYPED_VALUE_FIELD)).shortValue(); + } + + @Override public int getInt(int ordinal) { + return ((Number) record.get(TYPED_VALUE_FIELD)).intValue(); + } + + @Override public long getLong(int ordinal) { + return ((Number) record.get(TYPED_VALUE_FIELD)).longValue(); + } Review Comment: ๐ค nit: `getBoolean` (and the other scalar getters below) silently ignore `ordinal` and hardcode `TYPED_VALUE_FIELD`, while `getBinary` on line 576 goes through `fieldNameFor(ordinal)`. A one-line comment along the lines of `// Spark always invokes scalar getters with typedIdx, so TYPED_VALUE_FIELD is always correct here` would save a future reader from wondering whether the inconsistency is intentional. <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/VariantShreddingInferenceInternalRowFileWriter.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.SparkAdapterSupport$; +import org.apache.hudi.avro.VariantShreddingSchemaInferrer; +import org.apache.hudi.avro.VariantShreddingSchemaInferrer.VariantSample; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.io.storage.VariantShreddingInferenceFileWriter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieInternalRowFileWriter} decorator that infers a per-file variant shredding + * schema from the first rows before opening the real parquet writer; the row-writer-path + * sibling of {@link VariantShreddingInferenceFileWriter}, sharing its buffering thresholds and + * failure semantics. + * + * <p>Meta columns including the commit seqno are composed into the row by the handle BEFORE + * {@code writeRow}, so ordered replay is value-exact here by construction. Rows and keys are + * copied because Spark iterators reuse their instances.</p> + */ +@Slf4j +public class VariantShreddingInferenceInternalRowFileWriter implements HoodieInternalRowFileWriter { + + /** Creates the real row file writer once the inferred typed_value schemas are known. */ + @FunctionalInterface + public interface InferredRowWriterFactory { + HoodieInternalRowFileWriter create(Map<String, HoodieSchema> inferredTypedValues) throws IOException; + } + + private final List<String> variantColumns; + private final int[] ordinals; + private final VariantShreddingSchemaInferrer inferrer; + private final InferredRowWriterFactory writerFactory; + private final long maxBufferedBytes; + private final DefaultSizeEstimator<InternalRow> sizeEstimator = new DefaultSizeEstimator<>(); + + private static final int SIZE_ESTIMATE_INTERVAL = 100; + + private final List<BufferedRow> buffer = new ArrayList<>(); Review Comment: ๐ค nit: `SIZE_ESTIMATE_INTERVAL` is declared after the instance fields, but the sibling class `VariantShreddingInferenceFileWriter` places it (along with the other static constants) at the top of the class body. Could you move it up above the instance field block to stay consistent? <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala: ########## @@ -56,12 +56,41 @@ trait HoodieHadoopFsRelationFactory { def buildOptions(): Map[String, String] } +object HoodieBaseHadoopFsRelationFactory { + /** + * Resolves the variant allow-reading-shredded value using the precedence: + * table option > hoodie session conf > explicit Spark conf > Hudi default. + */ + private[hudi] def resolveVariantAllowReadingShredded(tableOption: Option[String], + hoodieSessionValue: Option[String], + sparkConfValue: Option[String], + hudiDefault: String): String = + tableOption.orElse(hoodieSessionValue).orElse(sparkConfValue).getOrElse(hudiDefault) +} + abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext, val metaClient: HoodieTableMetaClient, val options: Map[String, String], val schemaSpec: Option[StructType], val isBootstrap: Boolean ) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory with Logging { + // Propagate Hudi's variant allow-reading-shredded config to Spark's SQLConf. + // ParquetToSparkSchemaConverter reads this from SQLConf.get(), so it must be set + // before query execution starts here during table resolution + if (HoodieSparkUtils.gteqSpark4_0) { + val sqlConf = sqlContext.sparkSession.sessionState.conf + val hoodieConfKey = HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key + // Literal, not SQLConf.VARIANT_ALLOW_READING_SHREDDED.key: that field is absent when this module compiles against Spark 3.x. + val sparkConfKey = "spark.sql.variant.allowReadingShredded" + // Precedence: table option > hoodie session key > explicit Spark conf > Hudi default. + val allowReadingShredded = HoodieBaseHadoopFsRelationFactory.resolveVariantAllowReadingShredded( + options.get(hoodieConfKey), + if (sqlConf.contains(hoodieConfKey)) Some(sqlConf.getConfString(hoodieConfKey)) else None, + if (sqlConf.contains(sparkConfKey)) Some(sqlConf.getConfString(sparkConfKey)) else None, + HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue.toString) + sqlConf.setConfString(sparkConfKey, allowReadingShredded) Review Comment: ๐ค This mutates a session-level SQLConf inside the relation factory constructor, which has cross-query side effects: a subsequent read of a non-Hudi parquet (or a different Hudi table whose options resolve differently) will inherit whatever value the last factory call wrote. With concurrent reads on the same SparkSession, two relation factories can race on this key and the second write wins for both queries. @yihua could you confirm whether this trade-off was considered, and whether there's an alternative that scopes the conf to the current query (e.g. via SQLConf.withExistingConf or pushing it into the read's Hadoop configuration rather than the session conf)? <sub><i>- AI-generated; verify before applying. React ๐/๐ to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
