hudi-agent commented on code in PR #18961:
URL: https://github.com/apache/hudi/pull/18961#discussion_r3446975063
##########
hudi-spark-datasource/hudi-spark4-common/src/main/java/org/apache/hudi/variant/Spark4VariantShreddingProvider.java:
##########
@@ -398,4 +437,240 @@ public boolean allowNumericScaleChanges() {
return true;
}
}
+
+ /**
+ * Base {@link ShreddingUtils.ShreddedRow} with all accessors throwing;
concrete rows override
+ * only the accessors valid for their nesting context. This is the read-path
mirror of the
+ * write-path {@link AvroShreddedResult}: it reads Avro records to feed
Spark's reconstruction
+ * ({@link ShreddingUtils#rebuild}).
+ */
+ abstract static class BaseAvroShreddedRow implements
ShreddingUtils.ShreddedRow {
+ @Override
+ public boolean isNullAt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public BigDecimal getDecimal(int ordinal, int precision, int scale) {
+ throw unsupported();
+ }
+
+ @Override
+ public String getString(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public UUID getUuid(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getStruct(int ordinal, int numFields) {
+ throw unsupported();
+ }
+
+ @Override
+ public ShreddingUtils.ShreddedRow getArray(int ordinal) {
+ throw unsupported();
+ }
+
+ @Override
+ public int numElements() {
+ throw unsupported();
+ }
+
+ private static UnsupportedOperationException unsupported() {
+ return new UnsupportedOperationException("Accessor not valid for this
shredded row context");
+ }
+ }
+
+ /**
+ * A shredded variant struct {@code {value, [metadata], typed_value}}. Maps
the Spark
+ * {@link VariantSchema} ordinals (variantIdx / topLevelMetadataIdx /
typedIdx) back to the named
+ * Avro fields, and reads {@code typed_value} for scalar/object/array
reconstruction.
+ */
+ static final class AvroVariantRow extends BaseAvroShreddedRow {
+ private final GenericRecord record;
+ private final VariantSchema schema;
+
+ AvroVariantRow(GenericRecord record, VariantSchema schema) {
+ this.record = record;
+ this.schema = schema;
+ }
+
+ private String fieldNameFor(int ordinal) {
+ if (ordinal == schema.typedIdx) {
+ return HoodieSchema.Variant.VARIANT_TYPED_VALUE_FIELD;
+ }
+ if (ordinal == schema.variantIdx) {
+ return HoodieSchema.Variant.VARIANT_VALUE_FIELD;
+ }
+ if (ordinal == schema.topLevelMetadataIdx) {
+ return HoodieSchema.Variant.VARIANT_METADATA_FIELD;
+ }
+ throw new IllegalArgumentException("Unexpected shredded ordinal: " +
ordinal);
+ }
+
+ @Override public boolean isNullAt(int ordinal) {
+ return record.get(fieldNameFor(ordinal)) == null;
+ }
+
+ @Override public byte[] getBinary(int ordinal) {
+ return toByteArray((ByteBuffer) record.get(fieldNameFor(ordinal)));
+ }
+
+ // The scalar getters below read typed_value directly: Spark only invokes
them for the scalar
Review Comment:
🤖 nit: could you add a brief inline comment on `getBoolean` (and the other
scalar getters) explaining why `ordinal` is ignored here — e.g. `// scalar
accessors are only called with schema.typedIdx`? `isNullAt` and `getBinary`
route through `fieldNameFor(ordinal)`, so a reader hitting this block will
naturally wonder whether the ordinal dispatch was accidentally left out.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/VariantShreddingInferenceInternalRowFileWriter.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.avro.VariantShreddingSchemaInferrer;
+import org.apache.hudi.avro.VariantShreddingSchemaInferrer.VariantSample;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.io.storage.VariantShreddingInferenceFileWriter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link HoodieInternalRowFileWriter} decorator that infers a per-file
variant shredding
+ * schema from the first rows before opening the real parquet writer; the
row-writer-path
+ * sibling of {@link VariantShreddingInferenceFileWriter}, sharing its
buffering thresholds and
+ * failure semantics.
+ *
+ * <p>Meta columns including the commit seqno are composed into the row by the
handle BEFORE
+ * {@code writeRow}, so ordered replay is value-exact here by construction.
Rows and keys are
+ * copied because Spark iterators reuse their instances.</p>
+ */
+@Slf4j
+public class VariantShreddingInferenceInternalRowFileWriter implements
HoodieInternalRowFileWriter {
+
+ /** Creates the real row file writer once the inferred typed_value schemas
are known. */
+ @FunctionalInterface
+ public interface InferredRowWriterFactory {
+ HoodieInternalRowFileWriter create(Map<String, HoodieSchema>
inferredTypedValues) throws IOException;
+ }
+
+ private final List<String> variantColumns;
+ private final int[] ordinals;
+ private final VariantShreddingSchemaInferrer inferrer;
+ private final InferredRowWriterFactory writerFactory;
+ private final long maxBufferedBytes;
+ private final DefaultSizeEstimator<InternalRow> sizeEstimator = new
DefaultSizeEstimator<>();
+
+ private static final int SIZE_ESTIMATE_INTERVAL = 100;
+
+ private final List<BufferedRow> buffer = new ArrayList<>();
Review Comment:
🤖 nit: `SIZE_ESTIMATE_INTERVAL` is declared between instance fields rather
than with the other constants — could you move it to the top of the class? The
sibling `VariantShreddingInferenceFileWriter` puts `MAX_BUFFERED_RECORDS` /
`MAX_BUFFERED_BYTES` before all instance fields, so this stands out as
inconsistent.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]