voonhous commented on code in PR #18961:
URL: https://github.com/apache/hudi/pull/18961#discussion_r3441691109


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/VariantReconstruction.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage.hadoop;
+
+import org.apache.hudi.avro.VariantSchemaUtils;
+import org.apache.hudi.avro.VariantShreddingProvider;
+import org.apache.hudi.avro.VariantShreddingRuntime;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Reconstructs unshredded variants when reading an already-shredded base file 
on the Avro
+ * ({@code HoodieRecordType.AVRO}) read path.
+ *
+ * <p>parquet-avro does not understand variant shredding, so a shredded 
variant column comes back as
+ * a raw {@code {metadata, value, typed_value}} record. This reads such 
columns at their shredded
+ * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code 
{metadata, value}}
+ * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before 
records reach the
+ * merger/writer. The Spark/InternalRow read path reconstructs natively and 
does not use this.
+ *
+ * <p>See https://github.com/apache/hudi/issues/18931.
+ */
+final class VariantReconstruction {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VariantReconstruction.class);
+
+  private final HoodieSchema intermediateSchema;
+  private final Schema outputAvroSchema;
+  private final VariantShreddingProvider provider;
+  // Indexed by field position in the (requested == output) record. For target 
fields, the file's
+  // shredded sub-schema and the unshredded target sub-schema for rebuild; 
null for non-targets.
+  private final boolean[] isTarget;
+  private final Schema[] shreddedSubSchemas;
+  private final Schema[] unshreddedSubSchemas;
+
+  private VariantReconstruction(HoodieSchema intermediateSchema, Schema 
outputAvroSchema,
+                                VariantShreddingProvider provider, boolean[] 
isTarget,
+                                Schema[] shreddedSubSchemas, Schema[] 
unshreddedSubSchemas) {
+    this.intermediateSchema = intermediateSchema;
+    this.outputAvroSchema = outputAvroSchema;
+    this.provider = provider;
+    this.isTarget = isTarget;
+    this.shreddedSubSchemas = shreddedSubSchemas;
+    this.unshreddedSubSchemas = unshreddedSubSchemas;
+  }
+
+  /**
+   * Schema to read the parquet file with: the requested schema, but with 
shredded variant columns
+   * swapped to their file (typed_value-bearing) form so parquet-avro 
materializes {@code typed_value}.
+   */
+  HoodieSchema intermediateSchema() {
+    return intermediateSchema;
+  }
+
+  /**
+   * Builds a reconstruction for the given file and requested schemas, or 
returns {@code null} when
+   * none is needed (no shredded variant columns in the file, reading shredded 
variants disabled, or
+   * no provider available - in which case the read proceeds unchanged).
+   */
+  static VariantReconstruction create(HoodieSchema fileSchema, HoodieSchema 
requestedSchema, HoodieStorage storage) {
+    if (requestedSchema.getType() != HoodieSchemaType.RECORD || 
fileSchema.getType() != HoodieSchemaType.RECORD) {
+      return null;
+    }
+    if 
(!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(),
+        
HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.defaultValue())) {
+      return null;
+    }
+
+    List<HoodieSchemaField> requestedFields = requestedSchema.getFields();
+    List<HoodieSchemaField> intermediateFields = new ArrayList<>();
+    boolean[] isTarget = new boolean[requestedFields.size()];
+    boolean anyTarget = false;
+    for (int i = 0; i < requestedFields.size(); i++) {
+      HoodieSchemaField requestedField = requestedFields.get(i);
+      Option<HoodieSchemaField> fileField = 
fileSchema.getField(requestedField.name());
+      if (fileField.isPresent() && 
isShreddedVariantTarget(requestedField.schema(), fileField.get().schema())) {
+        isTarget[i] = true;
+        anyTarget = true;
+        // Read this column in its on-disk shredded shape. 
createNewSchemaField: the requested
+        // schema's avro fields are position-attached and cannot be reused in 
a new record.
+        
intermediateFields.add(HoodieSchemaUtils.createNewSchemaField(requestedField.withSchema(fileField.get().schema())));
+      } else {
+        
intermediateFields.add(HoodieSchemaUtils.createNewSchemaField(requestedField));
+      }
+    }
+    if (!anyTarget) {
+      return null;
+    }
+
+    VariantShreddingProvider provider = loadProvider(storage);
+    if (provider == null) {
+      LOG.warn("Base file has shredded variant column(s) but no 
VariantShreddingProvider is available; "
+          + "variants will not be reconstructed. Set {} or add a provider 
implementation to the classpath.",
+          HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key());
+      return null;
+    }
+
+    HoodieSchema intermediateSchema = HoodieSchema.createRecord(
+        requestedSchema.getAvroSchema().getName(),
+        requestedSchema.getAvroSchema().getNamespace(),
+        requestedSchema.getAvroSchema().getDoc(),
+        intermediateFields);
+    // Records leave this reader unshredded; output field order matches the 
requested/intermediate order.
+    HoodieSchema outputSchema = 
VariantSchemaUtils.stripVariantShredding(requestedSchema);
+
+    Schema[] shreddedSubSchemas = new Schema[requestedFields.size()];
+    Schema[] unshreddedSubSchemas = new Schema[requestedFields.size()];
+    for (int i = 0; i < requestedFields.size(); i++) {
+      if (isTarget[i]) {
+        shreddedSubSchemas[i] = 
nonNull(fileSchema.getField(requestedFields.get(i).name()).get().schema()).getAvroSchema();
+        unshreddedSubSchemas[i] = 
nonNull(outputSchema.getFields().get(i).schema()).getAvroSchema();
+      }
+    }
+
+    return new VariantReconstruction(intermediateSchema, 
outputSchema.toAvroSchema(), provider,
+        isTarget, shreddedSubSchemas, unshreddedSubSchemas);
+  }
+
+  /**
+   * Rebuilds shredded variant columns of {@code in} (read in the intermediate 
shredded shape) into
+   * a record conforming to the unshredded output schema.
+   */
+  IndexedRecord reconstruct(IndexedRecord in) {
+    GenericRecord out = new GenericData.Record(outputAvroSchema);
+    for (int i = 0; i < isTarget.length; i++) {
+      Object value = in.get(i);
+      if (isTarget[i] && value instanceof GenericRecord) {
+        out.put(i, provider.rebuildVariantRecord((GenericRecord) value, 
shreddedSubSchemas[i], unshreddedSubSchemas[i]));
+      } else {
+        // Non-variant column, or a null variant column: pass through 
unchanged.
+        out.put(i, value);
+      }
+    }
+    return out;
+  }
+
+  /**
+   * Whether this column must be read in its on-disk shredded shape and 
reconstructed. The file
+   * schema comes from converting the parquet footer MessageType, which loses 
the variant
+   * logical type (variant groups come back as plain records), so the on-disk 
side is detected
+   * by SHAPE, anchored by the requested side: the requested column (from the 
table schema,
+   * logical type intact) must be a variant for the shape match to count.
+   */
+  private static boolean isShreddedVariantTarget(HoodieSchema 
requestedFieldSchema, HoodieSchema fileFieldSchema) {
+    HoodieSchema file = nonNull(fileFieldSchema);
+    if (file.getType() == HoodieSchemaType.VARIANT && ((HoodieSchema.Variant) 
file).isShredded()) {
+      return true;
+    }
+    HoodieSchema requested = nonNull(requestedFieldSchema);
+    return requested.getType() == HoodieSchemaType.VARIANT
+        && VariantSchemaUtils.isShreddedVariantShape(file);
+  }
+
+  private static HoodieSchema nonNull(HoodieSchema schema) {

Review Comment:
   Resolved in https://github.com/apache/hudi/pull/18938#discussion_r3434807504



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to