voonhous commented on code in PR #18938: URL: https://github.com/apache/hudi/pull/18938#discussion_r3458608572
########## hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage.hadoop; + +import org.apache.hudi.avro.VariantSchemaUtils; +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reconstructs unshredded variants when reading an already-shredded base file on the Avro + * ({@code HoodieRecordType.AVRO}) read path. + * + * <p>parquet-avro does not understand variant shredding, so a shredded variant column comes back as + * a raw {@code {metadata, value, typed_value}} record. This reads such columns at their shredded + * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code {metadata, value}} + * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before records reach the + * merger/writer. The Spark/InternalRow read path reconstructs natively and does not use this. + * + * <p>See https://github.com/apache/hudi/issues/18931. + */ +final class HoodieVariantReconstruction { + + private final HoodieSchema intermediateSchema; + private final Schema outputAvroSchema; + private final VariantShreddingProvider provider; + // Indexed by field position in the (requested == output) record. For target fields, the file's + // shredded sub-schema and the unshredded target sub-schema for rebuild; null for non-targets. + private final boolean[] isTarget; + private final Schema[] shreddedSubSchemas; + private final Schema[] unshreddedSubSchemas; + + private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema outputAvroSchema, + VariantShreddingProvider provider, boolean[] isTarget, + Schema[] shreddedSubSchemas, Schema[] unshreddedSubSchemas) { + this.intermediateSchema = intermediateSchema; + this.outputAvroSchema = outputAvroSchema; + this.provider = provider; + this.isTarget = isTarget; + this.shreddedSubSchemas = shreddedSubSchemas; + this.unshreddedSubSchemas = unshreddedSubSchemas; + } + + /** + * Schema to read the parquet file with: the requested schema, but with shredded variant columns + * swapped to their file (typed_value-bearing) form so parquet-avro materializes {@code typed_value}. + */ + HoodieSchema intermediateSchema() { + return intermediateSchema; + } + + /** + * Builds a reconstruction for the given file and requested schemas, or returns {@code null} when + * none is needed (no shredded variant columns in the file, or reading shredded variants disabled). + * Throws when the file has shredded variant columns to reconstruct but no provider is available, + * since proceeding would silently drop the typed_value payload. + */ + static HoodieVariantReconstruction create(HoodieSchema fileSchema, HoodieSchema requestedSchema, HoodieStorage storage) { + if (requestedSchema.getType() != HoodieSchemaType.RECORD || fileSchema.getType() != HoodieSchemaType.RECORD) { + return null; + } + if (!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(), Review Comment: Good catch, fixed in a4ba4ad. Moved the `allow.reading.shredded` check below the shredded-column detection loop: return null when no shredded variant column is present (regardless of the flag), and throw when shredded columns ARE present but reading is disabled - mirroring the no-provider branch and Spark's `allowReadingShredded=false`, which rejects shredded reads rather than discarding data. So a shredded base file with reading disabled now fails fast instead of silently reading at the unshredded schema and dropping `typed_value`. Added a regression test (`TestHoodieVariantReconstruction`). ########## hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieVariantReconstruction.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.io.storage.hadoop; + +import org.apache.hudi.avro.VariantSchemaUtils; +import org.apache.hudi.avro.VariantShreddingProvider; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.storage.HoodieStorage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reconstructs unshredded variants when reading an already-shredded base file on the Avro + * ({@code HoodieRecordType.AVRO}) read path. + * + * <p>parquet-avro does not understand variant shredding, so a shredded variant column comes back as + * a raw {@code {metadata, value, typed_value}} record. This reads such columns at their shredded + * (typed_value-bearing) schema, then rebuilds the standard unshredded {@code {metadata, value}} + * variant via {@link VariantShreddingProvider#rebuildVariantRecord} before records reach the + * merger/writer. The Spark/InternalRow read path reconstructs natively and does not use this. + * + * <p>See https://github.com/apache/hudi/issues/18931. + */ +final class HoodieVariantReconstruction { + + private final HoodieSchema intermediateSchema; + private final Schema outputAvroSchema; + private final VariantShreddingProvider provider; + // Indexed by field position in the (requested == output) record. For target fields, the file's + // shredded sub-schema and the unshredded target sub-schema for rebuild; null for non-targets. + private final boolean[] isTarget; + private final Schema[] shreddedSubSchemas; + private final Schema[] unshreddedSubSchemas; + + private HoodieVariantReconstruction(HoodieSchema intermediateSchema, Schema outputAvroSchema, + VariantShreddingProvider provider, boolean[] isTarget, + Schema[] shreddedSubSchemas, Schema[] unshreddedSubSchemas) { + this.intermediateSchema = intermediateSchema; + this.outputAvroSchema = outputAvroSchema; + this.provider = provider; + this.isTarget = isTarget; + this.shreddedSubSchemas = shreddedSubSchemas; + this.unshreddedSubSchemas = unshreddedSubSchemas; + } + + /** + * Schema to read the parquet file with: the requested schema, but with shredded variant columns + * swapped to their file (typed_value-bearing) form so parquet-avro materializes {@code typed_value}. + */ + HoodieSchema intermediateSchema() { + return intermediateSchema; + } + + /** + * Builds a reconstruction for the given file and requested schemas, or returns {@code null} when + * none is needed (no shredded variant columns in the file, or reading shredded variants disabled). + * Throws when the file has shredded variant columns to reconstruct but no provider is available, + * since proceeding would silently drop the typed_value payload. + */ + static HoodieVariantReconstruction create(HoodieSchema fileSchema, HoodieSchema requestedSchema, HoodieStorage storage) { + if (requestedSchema.getType() != HoodieSchemaType.RECORD || fileSchema.getType() != HoodieSchemaType.RECORD) { + return null; + } + if (!storage.getConf().getBoolean(HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED.key(), Review Comment: Done in a4ba4ad - moved the check below the `anyTarget` detection so it throws when shredded columns are present and reading is disabled, instead of returning null and dropping `typed_value`. Same fix as the parallel thread; added `TestHoodieVariantReconstruction` to cover it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
