hudi-agent commented on code in PR #18938:
URL: https://github.com/apache/hudi/pull/18938#discussion_r3467710139


##########
hudi-common/src/main/java/org/apache/hudi/avro/VariantShreddingProvider.java:
##########
@@ -63,4 +86,26 @@ GenericRecord shredVariantRecord(
       GenericRecord unshreddedVariant,
       Schema shreddedSchema,
       HoodieSchema.Variant variantSchema);
+
+  /**
+   * Reconstruct an unshredded variant GenericRecord from a shredded one (the 
inverse of
+   * {@link #shredVariantRecord}).
+   * <p>
+   * Used on the read path: records read from an already-shredded base file 
(compaction/clustering)
+   * arrive with {@code typed_value} populated. This rebuilds the full variant 
binary so the record
+   * presents the standard unshredded {@code {metadata, value}} shape before 
it reaches the
+   * merger/writer.
+   *
+   * @param shreddedVariant  GenericRecord with {value, metadata, typed_value} 
read from a shredded base file
+   * @param shreddedSchema   the Avro schema of {@code shreddedVariant} 
(carries typed_value)
+   * @param unshreddedSchema target Avro schema with {value: ByteBuffer, 
metadata: ByteBuffer}
+   * @return a GenericRecord conforming to {@code unshreddedSchema} with the 
full reconstructed
+   *         variant binary in {@code value}, or {@code null} when {@code 
shreddedVariant} is null
+   * @throws org.apache.hudi.exception.HoodieException if {@code 
shreddedVariant} is missing its
+   *         required {@code metadata} field (a malformed shredded base file)
+   */
+  GenericRecord rebuildVariantRecord(

Review Comment:
   πŸ€– nit: the method uses "rebuild" (`rebuildVariantRecord`) while the 
orchestrating class is `HoodieVariantReconstruction` with a public method 
`reconstruct()` β€” and the Javadoc for this very method opens with "Reconstruct 
an unshredded variant…". Could you pick one term and use it consistently? 
`reconstructVariantRecord` would align with the class name.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file 
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded 
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such 
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code 
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before 
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and 
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class HoodieVariantReconstruction {
+
+  private final HoodieSchema intermediateSchema;
+  private final Schema outputAvroSchema;
+  private final VariantShreddingProvider provider;
+  // Indexed by field position in the (requested == output) record. For target 
fields, the file's
+  // shredded sub-schema and the unshredded target sub-schema for rebuild; 
null for non-targets.
+  private final boolean[] isTarget;
+  private final Schema[] shreddedSubSchemas;
+  private final Schema[] unshreddedSubSchemas;
+
+  private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema 
outputAvroSchema,
+                                VariantShreddingProvider provider, boolean[] 
isTarget,
+                                Schema[] shreddedSubSchemas, Schema[] 
unshreddedSubSchemas) {
+    this.intermediateSchema = intermediateSchema;
+    this.outputAvroSchema = outputAvroSchema;
+    this.provider = provider;
+    this.isTarget = isTarget;
+    this.shreddedSubSchemas = shreddedSubSchemas;
+    this.unshreddedSubSchemas = unshreddedSubSchemas;
+  }
+
+  /**
+   * Schema to read the parquet file with: the requested schema, but with 
shredded variant columns
+   * swapped to their file (typed_value-bearing) form so parquet-avro 
materializes {@code typed_value}.
+   */
+  HoodieSchema intermediateSchema() {
+    return intermediateSchema;
+  }
+
+  /**
+   * Builds a reconstruction for the given file and requested schemas, or 
returns {@code null} when
+   * none is needed (the file has no shredded variant columns). Throws when 
the file has shredded
+   * variant columns to reconstruct but reading shredded variants is disabled, 
or no provider is
+   * available: either way, reading at the unshredded schema would silently 
drop the typed_value payload.
+   */
+  static HoodieVariantReconstruction create(HoodieSchema fileSchema, 
HoodieSchema requestedSchema, HoodieStorage storage) {
+    if (requestedSchema.getType() != HoodieSchemaType.RECORD || 
fileSchema.getType() != HoodieSchemaType.RECORD) {
+      return null;
+    }
+
+    List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+    List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+    boolean[] isTarget = new boolean[requestedFields.size()];
+    boolean anyTarget = false;
+    for (int i = 0; i < requestedFields.size(); i++) {
+      HoodieSchemaField requestedField = requestedFields.get(i);
+      Option<HoodieSchemaField> fileField = 
fileSchema.getField(requestedField.name());
+      if (fileField.isPresent() && 
isShreddedVariant(fileField.get().schema())) {
+        isTarget[i] = true;
+        anyTarget = true;
+        // Read this column in its on-disk shredded shape.
+        
intermediateFields.add(requestedField.withSchema(fileField.get().schema()));
+      } else {
+        // Copy non-target fields too (withSchema makes a fresh Avro Field): 
reusing the requested
+        // field's Avro Field, already bound to the requested record, would 
fail Schema.setFields with
+        // "Field already used" when building the intermediate record below.
+        
intermediateFields.add(requestedField.withSchema(requestedField.schema()));
+      }
+    }
+    if (!anyTarget) {
+      // No shredded variant columns in the file: nothing to reconstruct, 
regardless of the flag.
+      return null;
+    }
+
+    if 
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+        
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+      // Reading at the unshredded schema would drop typed_value and silently 
corrupt variants whose
+      // payload lives there, so fail fast. Mirrors the no-provider branch and 
Spark's
+      // allowReadingShredded=false, which rejects shredded reads rather than 
discarding data.
+      throw new HoodieException("Base file has shredded variant column(s) but 
reading shredded variants is "
+          + "disabled (" + 
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key()
+          + "=false). Enable it to reconstruct them; otherwise the typed_value 
payload would be silently dropped.");
+    }
+
+    VariantShreddingProvider provider = loadProvider(storage);
+    if (provider == null) {
+      // Reading would drop typed_value and silently corrupt variants whose 
payload lives there, so fail fast.
+      throw new HoodieException("Base file has shredded variant column(s) and 
reading shredded variants is "
+          + "enabled, but no VariantShreddingProvider is available to 
reconstruct them. Set "
+          + HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key()
+          + " or add a provider implementation (e.g. the Spark variant module) 
to the classpath.");
+    }
+
+    HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+        requestedSchema.getAvroSchema().getName(),
+        requestedSchema.getAvroSchema().getNamespace(),
+        requestedSchema.getAvroSchema().getDoc(),
+        intermediateFields);
+    // Records leave this reader unshredded; output field order matches the 
requested/intermediate order.
+    HoodieSchema outputSchema = 
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+    Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+    Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+    for (int i = 0; i < requestedFields.size(); i++) {
+      if (isTarget[i]) {
+        shreddedSubSchemas[i] = 
unwrapNullable(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();

Review Comment:
   πŸ€– nit: the `Option.get()` call is buried mid-chain here β€” its safety relies 
on `isTarget[i]` having been set only when `fileField.isPresent()` (line 101), 
but a reader has to trace back to see that. Could you extract 
`fileSchema.getField(requestedFields.get(i).name())` to a local variable so the 
invariant is visible at the point of use?
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to