hudi-agent commented on code in PR #18938:
URL: https://github.com/apache/hudi/pull/18938#discussion_r3467710139
##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java:
##########
@@ -63,4 +86,26 @@ GenericRecord shredVariantRecord(
GenericRecord unshreddedVariant,
Schema shreddedSchema,
HoodieSchema.Variant variantSchema);
+
+ /**
+ * Reconstruct an unshredded variant GenericRecord from a shredded one (the
inverse of
+ * {@link #shredVariantRecord}).
+ * <p>
+ * Used on the read path: records read from an already-shredded base file
(compaction/clustering)
+ * arrive with {@code typed_value} populated. This rebuilds the full variant
binary so the record
+ * presents the standard unshredded {@code {metadata, value}} shape before
it reaches the
+ * merger/writer.
+ *
+ * @param shreddedVariant GenericRecord with {value, metadata, typed_value}
read from a shredded base file
+ * @param shreddedSchema the Avro schema of {@code shreddedVariant}
(carries typed_value)
+ * @param unshreddedSchema target Avro schema with {value: ByteBuffer,
metadata: ByteBuffer}
+ * @return a GenericRecord conforming to {@code unshreddedSchema} with the
full reconstructed
+ * variant binary in {@code value}, or {@code null} when {@code
shreddedVariant} is null
+ * @throws org.apache.hudi.exception.HoodieException if {@code
shreddedVariant} is missing its
+ * required {@code metadata} field (a malformed shredded base file)
+ */
+ GenericRecord rebuildVariantRecord(
Review Comment:
π€ nit: the method uses "rebuild" (`rebuildVariantRecord`) while the
orchestrating class is `HoodieVariantReconstruction` with a public method
`reconstruct()` β and the Javadoc for this very method opens with "Reconstruct
an unshredded variantβ¦". Could you pick one term and use it consistently?
`reconstructVariantRecord` would align with the class name.
<sub><i>β οΈ AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class HoodieVariantReconstruction {
+
+ private final HoodieSchema intermediateSchema;
+ private final Schema outputAvroSchema;
+ private final VariantShreddingProvider provider;
+ // Indexed by field position in the (requested == output) record. For target
fields, the file's
+ // shredded sub-schema and the unshredded target sub-schema for rebuild;
null for non-targets.
+ private final boolean[] isTarget;
+ private final Schema[] shreddedSubSchemas;
+ private final Schema[] unshreddedSubSchemas;
+
+ private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema
outputAvroSchema,
+ VariantShreddingProvider provider, boolean[]
isTarget,
+ Schema[] shreddedSubSchemas, Schema[]
unshreddedSubSchemas) {
+ this.intermediateSchema = intermediateSchema;
+ this.outputAvroSchema = outputAvroSchema;
+ this.provider = provider;
+ this.isTarget = isTarget;
+ this.shreddedSubSchemas = shreddedSubSchemas;
+ this.unshreddedSubSchemas = unshreddedSubSchemas;
+ }
+
+ /**
+ * Schema to read the parquet file with: the requested schema, but with
shredded variant columns
+ * swapped to their file (typed_value-bearing) form so parquet-avro
materializes {@code typed_value}.
+ */
+ HoodieSchema intermediateSchema() {
+ return intermediateSchema;
+ }
+
+ /**
+ * Builds a reconstruction for the given file and requested schemas, or
returns {@code null} when
+ * none is needed (the file has no shredded variant columns). Throws when
the file has shredded
+ * variant columns to reconstruct but reading shredded variants is disabled,
or no provider is
+ * available: either way, reading at the unshredded schema would silently
drop the typed_value payload.
+ */
+ static HoodieVariantReconstruction create(HoodieSchema fileSchema,
HoodieSchema requestedSchema, HoodieStorage storage) {
+ if (requestedSchema.getType() != HoodieSchemaType.RECORD ||
fileSchema.getType() != HoodieSchemaType.RECORD) {
+ return null;
+ }
+
+ List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+ List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+ boolean[] isTarget = new boolean[requestedFields.size()];
+ boolean anyTarget = false;
+ for (int i = 0; i < requestedFields.size(); i++) {
+ HoodieSchemaField requestedField = requestedFields.get(i);
+ Option<HoodieSchemaField> fileField =
fileSchema.getField(requestedField.name());
+ if (fileField.isPresent() &&
isShreddedVariant(fileField.get().schema())) {
+ isTarget[i] = true;
+ anyTarget = true;
+ // Read this column in its on-disk shredded shape.
+
intermediateFields.add(requestedField.withSchema(fileField.get().schema()));
+ } else {
+ // Copy non-target fields too (withSchema makes a fresh Avro Field):
reusing the requested
+ // field's Avro Field, already bound to the requested record, would
fail Schema.setFields with
+ // "Field already used" when building the intermediate record below.
+
intermediateFields.add(requestedField.withSchema(requestedField.schema()));
+ }
+ }
+ if (!anyTarget) {
+ // No shredded variant columns in the file: nothing to reconstruct,
regardless of the flag.
+ return null;
+ }
+
+ if
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+ // Reading at the unshredded schema would drop typed_value and silently
corrupt variants whose
+ // payload lives there, so fail fast. Mirrors the no-provider branch and
Spark's
+ // allowReadingShredded=false, which rejects shredded reads rather than
discarding data.
+ throw new HoodieException("Base file has shredded variant column(s) but
reading shredded variants is "
+ + "disabled (" +
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key()
+ + "=false). Enable it to reconstruct them; otherwise the typed_value
payload would be silently dropped.");
+ }
+
+ VariantShreddingProvider provider = loadProvider(storage);
+ if (provider == null) {
+ // Reading would drop typed_value and silently corrupt variants whose
payload lives there, so fail fast.
+ throw new HoodieException("Base file has shredded variant column(s) and
reading shredded variants is "
+ + "enabled, but no VariantShreddingProvider is available to
reconstruct them. Set "
+ + HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()
+ + " or add a provider implementation (e.g. the Spark variant module)
to the classpath.");
+ }
+
+ HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+ requestedSchema.getAvroSchema().getName(),
+ requestedSchema.getAvroSchema().getNamespace(),
+ requestedSchema.getAvroSchema().getDoc(),
+ intermediateFields);
+ // Records leave this reader unshredded; output field order matches the
requested/intermediate order.
+ HoodieSchema outputSchema =
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+ Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+ Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+ for (int i = 0; i < requestedFields.size(); i++) {
+ if (isTarget[i]) {
+ shreddedSubSchemas[i] =
unwrapNullable(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();
Review Comment:
π€ nit: the `Option.get()` call is buried mid-chain here β its safety relies
on `isTarget[i]` having been set only when `fileField.isPresent()` (line 101),
but a reader has to trace back to see that. Could you extract
`fileSchema.getField(requestedFields.get(i).name())` to a local variable so the
invariant is visible at the point of use?
<sub><i>β οΈ AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]