laskoviymishka commented on code in PR #16370:
URL: https://github.com/apache/iceberg/pull/16370#discussion_r3442591964


##########
data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.parquet.VariantShreddingAnalyzer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantValue;
+
+/**
+ * Generic {@link Record} implementation that extracts variant values from 
{@link Record#get(int)}
+ * using positional indices aligned with {@link Schema#columns()}.
+ *
+ * <p>Buffered rows must be laid out against the same {@link Schema} passed as 
{@code engineSchema};
+ * otherwise {@link Record#get(int)} positions will not match the resolved 
column indices.
+ */
+class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, 
Schema> {
+
+  private final Map<Schema, Map<String, Integer>> columnIndicesBySchema = 
Maps.newHashMap();

Review Comment:
   This analyzer gets built once in `GenericFormatModels.register()` and lives 
on the `ParquetFormatModel` singleton, so this map is effectively 
process-global mutable state. `resolveColumnIndex` writes to it with 
`computeIfAbsent`, and if two writer threads (a KC sink with multiple workers, 
say) hit it at once we get concurrent structural modification on a plain 
`HashMap` — corruption or a spin, not just a lost update.
   
   There's a second problem stacked on it: `Schema` doesn't override 
`equals`/`hashCode`, so this is keyed on object identity and never evicts. A 
long-lived sink across schema evolution grows it without bound.
   
   The Spark analyzer stays stateless precisely to dodge both. I'd drop the 
cache entirely and build the index map locally in `resolveColumnIndex` — it's 
one linear scan over <100 top-level fields per buffered batch, which is noise 
on this path. wdyt?



##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java:
##########
@@ -151,23 +152,31 @@ private WriteBuilderWrapper(
         EncryptedOutputFile outputFile,
         WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
         VariantShreddingAnalyzer<D, S> variantAnalyzer,
-        Function<S, UnaryOperator<D>> copyFuncFactory) {
+        Function<S, UnaryOperator<D>> copyFuncFactory,
+        Class<S> schemaType) {
       this.internal = Parquet.write(outputFile);
       this.writerFunction = writerFunction;
       this.variantAnalyzer = variantAnalyzer;
       this.copyFuncFactory = copyFuncFactory;
+      this.schemaType = schemaType;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public ModelWriteBuilder<D, S> schema(Schema newSchema) {
       this.schema = newSchema;
       internal.schema(newSchema);
+      if (this.engineSchema == null && Schema.class.equals(schemaType)) {
+        this.engineSchema = (S) newSchema;
+      }
       return this;
     }
 
     @Override
     public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
-      this.engineSchema = newSchema;
+      if (newSchema != null) {

Review Comment:
   This makes a public setter on `ModelWriteBuilder` silently ignore null, with 
nothing in the contract saying so. Any caller that passes null to reset/clear 
now gets a no-op and no signal.
   
   I think the cleaner home for this guard is 
`RegistryBasedFileWriterFactory.newDataWriter` — only call 
`.engineSchema(inputSchema())` when `inputSchema()` is non-null, and leave this 
setter as a plain assignment. That keeps the KC-null intent without overloading 
the interface method. wdyt?



##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java:
##########
@@ -151,23 +152,31 @@ private WriteBuilderWrapper(
         EncryptedOutputFile outputFile,
         WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
         VariantShreddingAnalyzer<D, S> variantAnalyzer,
-        Function<S, UnaryOperator<D>> copyFuncFactory) {
+        Function<S, UnaryOperator<D>> copyFuncFactory,
+        Class<S> schemaType) {
       this.internal = Parquet.write(outputFile);
       this.writerFunction = writerFunction;
       this.variantAnalyzer = variantAnalyzer;
       this.copyFuncFactory = copyFuncFactory;
+      this.schemaType = schemaType;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public ModelWriteBuilder<D, S> schema(Schema newSchema) {
       this.schema = newSchema;
       internal.schema(newSchema);
+      if (this.engineSchema == null && Schema.class.equals(schemaType)) {

Review Comment:
   The correctness of this whole change hinges on `schema()` being called 
before `engineSchema()`: `schema()` auto-derives the engine schema, then 
`engineSchema(null)` is dropped by the null-guard in `engineSchema()` below, so 
the derived value survives. Reverse the call order and the auto-derive sees a 
non-null `engineSchema` and skips. That ordering is invisible to anyone 
implementing the builder.
   
   I'd make this order-independent: keep an explicit `engineSchemaWasSet` flag, 
and do the auto-derive in `build()` only when the flag is false. Then 
`schema()`/`engineSchema()` can be called in either order and a real engine 
schema is never clobbered.
   
   Separately, using `Schema.class.equals(schemaType)` as the signal leaks 
schema-type semantics into the parquet layer — any future model that uses 
`Schema` as its engine-schema token inherits an auto-derive it might not want. 
An explicit flag on `create()` (or a `shouldDeriveEngineSchema()` hook) would 
be more honest than a class-identity check. Thoughts?



##########
data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalTestHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRecordVariantShreddingAnalyzer {
+
+  private static final Schema VARIANT_AFTER_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.LongType.get()),
+          Types.NestedField.optional(2, "v", Types.VariantType.get()));
+
+  private static final Schema VARIANT_BEFORE_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "v", Types.VariantType.get()),
+          Types.NestedField.required(2, "id", Types.LongType.get()));
+
+  private Variant variant;
+  private List<Record> records;
+
+  @TempDir private Path temp;
+
+  @BeforeEach
+  public void before() {
+    ByteBuffer metadataBuffer = 
VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
+    VariantMetadata metadata = Variants.metadata(metadataBuffer);
+    ByteBuffer objectBuffer =
+        VariantTestUtil.createObject(
+            metadataBuffer,
+            ImmutableMap.of(
+                "a", Variants.of(42),
+                "b", Variants.of("hello")));
+    variant = Variant.of(metadata, Variants.value(metadata, objectBuffer));
+
+    GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    records =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("id", 1L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 2L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 3L, "v", variant)));
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    GroupType typedValue = shreddedTypes.get(2).asGroupType();
+    assertThat(typedValue.getName()).isEqualTo("typed_value");
+    assertThat(typedValue.containsField("a")).isTrue();
+    assertThat(typedValue.containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() {
+    GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA);
+    List<Record> variantFirstRecords =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("v", variant, "id", 1L)),
+            record.copy(ImmutableMap.of("v", variant, "id", 2L)));
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, 
VARIANT_BEFORE_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(1);
+    assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsSkipsNullVariantValues() {
+    GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant.setField("id", 1L);
+    withVariant.setField("v", variant);
+
+    GenericRecord withNullVariant = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withNullVariant.setField("id", 2L);
+    withNullVariant.setField("v", null);
+
+    GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant2.setField("id", 3L);
+    withVariant2.setField("v", variant);
+
+    List<Record> recordsWithNulls = ImmutableList.of(withVariant, 
withNullVariant, withVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue();
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWithAllNullVariantValues() {
+    GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant1.setField("id", 1L);
+    nullVariant1.setField("v", null);
+
+    GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant2.setField("id", 2L);
+    nullVariant2.setField("v", null);
+
+    List<Record> allNullVariants = ImmutableList.of(nullVariant1, 
nullVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).isEmpty();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNonVariantValues() {
+    GenericRecord invalidRecord = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    invalidRecord.setField("id", 1L);
+    invalidRecord.setField("v", "not-a-variant");
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(
+            () ->
+                analyzer.analyzeVariantColumns(
+                    ImmutableList.of(invalidRecord),
+                    VARIANT_AFTER_ID_SCHEMA,
+                    VARIANT_AFTER_ID_SCHEMA))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Expected Variant at index 1 but was: 
java.lang.String");
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNullEngineSchema() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, 
VARIANT_AFTER_ID_SCHEMA, null))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("Invalid engine schema: null");
+  }
+
+  @Test
+  public void testGenericFileWriterFactoryShreddingRoundTrip() throws 
IOException {
+    Table table =
+        TestTables.create(
+            temp.resolve("table").toFile(),
+            "variant",
+            VARIANT_AFTER_ID_SCHEMA,
+            PartitionSpec.unpartitioned(),
+            3);
+    try {
+      GenericFileWriterFactory writerFactory =
+          new GenericFileWriterFactory.Builder(table)
+              .dataFileFormat(FileFormat.PARQUET)
+              .dataSchema(VARIANT_AFTER_ID_SCHEMA)
+              .writerProperties(
+                  ImmutableMap.of(
+                      TableProperties.PARQUET_SHRED_VARIANTS, "true",
+                      TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2"))
+              .build();
+
+      OutputFileFactory fileFactory =
+          OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PARQUET).build();
+      EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile();
+
+      // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as 
engineSchema.
+      try (DataWriter<Record> writer =
+          writerFactory.newDataWriter(encryptedOutputFile, table.spec(), 
null)) {
+        for (Record rec : records) {
+          writer.write(rec);
+        }
+      }
+
+      OutputFile outputFile = encryptedOutputFile.encryptingOutputFile();
+      try (ParquetFileReader reader =
+          
ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) {
+        
assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema());
+      }
+
+      assertAllRawParquetRowsShredded(outputFile);
+      assertRecordsRoundTrip(outputFile);
+    } finally {
+      TestTables.clearTables();
+    }
+  }
+
+  @Test
+  public void testFormatModelRegistryShreddingRoundTrip() throws IOException {
+    OutputFile outputFile = 
Files.localOutput(temp.resolve("variant-shredded.parquet").toFile());
+    EncryptedOutputFile encryptedOutputFile = 
EncryptedFiles.plainAsEncryptedOutput(outputFile);
+
+    FileWriterBuilder<DataWriter<Record>, Object> writeBuilder =
+        FormatModelRegistry.dataWriteBuilder(FileFormat.PARQUET, Record.class, 
encryptedOutputFile);
+
+    try (DataWriter<Record> writer =
+        writeBuilder
+            .schema(VARIANT_AFTER_ID_SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
+            .setAll(
+                ImmutableMap.of(
+                    TableProperties.PARQUET_SHRED_VARIANTS, "true",
+                    TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2"))
+            .build()) {
+      for (Record rec : records) {
+        writer.write(rec);
+      }
+    }
+
+    try (ParquetFileReader reader =
+        
ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) {
+      
assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema());
+    }
+
+    assertAllRawParquetRowsShredded(outputFile);
+    assertRecordsRoundTrip(outputFile);
+  }
+
+  private void assertShreddedVariantParquetSchema(MessageType parquetSchema) {
+    GroupType variantGroup = parquetSchema.getType("v").asGroupType();
+    assertThat(variantGroup.containsField("typed_value")).isTrue();
+
+    GroupType typedValue = variantGroup.getType("typed_value").asGroupType();
+    assertThat(typedValue.containsField("a")).isTrue();
+    assertThat(typedValue.containsField("b")).isTrue();
+  }
+
+  private void assertShreddedTypedValueOnRow(Group row) {
+    Group variantData = row.getGroup("v", 0);
+    assertThat(variantData.getFieldRepetitionCount("value")).isEqualTo(0);
+
+    Group typedValue = variantData.getGroup("typed_value", 0);
+    assertThat(typedValue.getGroup("a", 0).getInteger("typed_value", 
0)).isEqualTo(42);
+    assertThat(typedValue.getGroup("b", 0).getString("typed_value", 
0)).isEqualTo("hello");
+  }
+
+  private void assertAllRawParquetRowsShredded(OutputFile outputFile) throws 
IOException {
+    try (ParquetReader<Group> rawReader =
+        ParquetReader.builder(
+                new GroupReadSupport(), new 
org.apache.hadoop.fs.Path(outputFile.location()))

Review Comment:
   `org.apache.hadoop.fs.Path` is fully qualified inline to dodge the 
`java.nio.file.Path` import used for `@TempDir`. The idiomatic fix is to import 
`org.apache.hadoop.fs.Path` and declare the temp dir as `@TempDir private 
java.nio.file.Path temp;` — same trick applies to the inline 
`org.apache.iceberg.data.parquet.GenericParquetReaders` in 
`assertRecordsRoundTrip`. Minor, but it cleans up two FQNs.



##########
data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalTestHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRecordVariantShreddingAnalyzer {
+
+  private static final Schema VARIANT_AFTER_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.LongType.get()),
+          Types.NestedField.optional(2, "v", Types.VariantType.get()));
+
+  private static final Schema VARIANT_BEFORE_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "v", Types.VariantType.get()),
+          Types.NestedField.required(2, "id", Types.LongType.get()));
+
+  private Variant variant;
+  private List<Record> records;
+
+  @TempDir private Path temp;
+
+  @BeforeEach
+  public void before() {
+    ByteBuffer metadataBuffer = 
VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
+    VariantMetadata metadata = Variants.metadata(metadataBuffer);
+    ByteBuffer objectBuffer =
+        VariantTestUtil.createObject(
+            metadataBuffer,
+            ImmutableMap.of(
+                "a", Variants.of(42),
+                "b", Variants.of("hello")));
+    variant = Variant.of(metadata, Variants.value(metadata, objectBuffer));
+
+    GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    records =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("id", 1L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 2L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 3L, "v", variant)));
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    GroupType typedValue = shreddedTypes.get(2).asGroupType();
+    assertThat(typedValue.getName()).isEqualTo("typed_value");
+    assertThat(typedValue.containsField("a")).isTrue();
+    assertThat(typedValue.containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() {
+    GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA);
+    List<Record> variantFirstRecords =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("v", variant, "id", 1L)),
+            record.copy(ImmutableMap.of("v", variant, "id", 2L)));
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, 
VARIANT_BEFORE_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(1);
+    assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsSkipsNullVariantValues() {
+    GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant.setField("id", 1L);
+    withVariant.setField("v", variant);
+
+    GenericRecord withNullVariant = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withNullVariant.setField("id", 2L);
+    withNullVariant.setField("v", null);
+
+    GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant2.setField("id", 3L);
+    withVariant2.setField("v", variant);
+
+    List<Record> recordsWithNulls = ImmutableList.of(withVariant, 
withNullVariant, withVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue();
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWithAllNullVariantValues() {
+    GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant1.setField("id", 1L);
+    nullVariant1.setField("v", null);
+
+    GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant2.setField("id", 2L);
+    nullVariant2.setField("v", null);
+
+    List<Record> allNullVariants = ImmutableList.of(nullVariant1, 
nullVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).isEmpty();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNonVariantValues() {
+    GenericRecord invalidRecord = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    invalidRecord.setField("id", 1L);
+    invalidRecord.setField("v", "not-a-variant");
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(
+            () ->
+                analyzer.analyzeVariantColumns(
+                    ImmutableList.of(invalidRecord),
+                    VARIANT_AFTER_ID_SCHEMA,
+                    VARIANT_AFTER_ID_SCHEMA))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Expected Variant at index 1 but was: 
java.lang.String");
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNullEngineSchema() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, 
VARIANT_AFTER_ID_SCHEMA, null))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("Invalid engine schema: null");
+  }
+
+  @Test
+  public void testGenericFileWriterFactoryShreddingRoundTrip() throws 
IOException {
+    Table table =
+        TestTables.create(
+            temp.resolve("table").toFile(),
+            "variant",
+            VARIANT_AFTER_ID_SCHEMA,
+            PartitionSpec.unpartitioned(),
+            3);
+    try {
+      GenericFileWriterFactory writerFactory =
+          new GenericFileWriterFactory.Builder(table)
+              .dataFileFormat(FileFormat.PARQUET)
+              .dataSchema(VARIANT_AFTER_ID_SCHEMA)
+              .writerProperties(
+                  ImmutableMap.of(
+                      TableProperties.PARQUET_SHRED_VARIANTS, "true",
+                      TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2"))
+              .build();
+
+      OutputFileFactory fileFactory =
+          OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PARQUET).build();
+      EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile();
+
+      // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as 
engineSchema.

Review Comment:
   The comment points at the wrong null — the third arg here is the 
`StructLike` partition, not the engine schema. The engine-schema null comes 
from `GenericFileWriterFactory` not forwarding an `inputSchema` to the 
superclass, so `inputSchema()` returns null and 
`RegistryBasedFileWriterFactory` calls `builder.engineSchema(null)`. I'd reword 
so a reader doesn't go looking for the null in the partition arg.



##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java:
##########
@@ -151,23 +152,31 @@ private WriteBuilderWrapper(
         EncryptedOutputFile outputFile,
         WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
         VariantShreddingAnalyzer<D, S> variantAnalyzer,
-        Function<S, UnaryOperator<D>> copyFuncFactory) {
+        Function<S, UnaryOperator<D>> copyFuncFactory,
+        Class<S> schemaType) {
       this.internal = Parquet.write(outputFile);
       this.writerFunction = writerFunction;
       this.variantAnalyzer = variantAnalyzer;
       this.copyFuncFactory = copyFuncFactory;
+      this.schemaType = schemaType;
     }
 
     @Override
+    @SuppressWarnings("unchecked")

Review Comment:
   The only unchecked cast is `(S) newSchema` on the auto-derive line. I'd 
narrow the suppression to that statement (or a tiny helper) rather than the 
whole method, so a future cast in here doesn't get silently covered.



##########
data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalTestHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRecordVariantShreddingAnalyzer {
+
+  private static final Schema VARIANT_AFTER_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.LongType.get()),
+          Types.NestedField.optional(2, "v", Types.VariantType.get()));
+
+  private static final Schema VARIANT_BEFORE_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "v", Types.VariantType.get()),
+          Types.NestedField.required(2, "id", Types.LongType.get()));
+
+  private Variant variant;
+  private List<Record> records;
+
+  @TempDir private Path temp;
+
+  @BeforeEach
+  public void before() {
+    ByteBuffer metadataBuffer = 
VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
+    VariantMetadata metadata = Variants.metadata(metadataBuffer);
+    ByteBuffer objectBuffer =
+        VariantTestUtil.createObject(
+            metadataBuffer,
+            ImmutableMap.of(
+                "a", Variants.of(42),
+                "b", Variants.of("hello")));
+    variant = Variant.of(metadata, Variants.value(metadata, objectBuffer));
+
+    GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    records =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("id", 1L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 2L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 3L, "v", variant)));
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    GroupType typedValue = shreddedTypes.get(2).asGroupType();
+    assertThat(typedValue.getName()).isEqualTo("typed_value");
+    assertThat(typedValue.containsField("a")).isTrue();
+    assertThat(typedValue.containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() {
+    GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA);
+    List<Record> variantFirstRecords =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("v", variant, "id", 1L)),
+            record.copy(ImmutableMap.of("v", variant, "id", 2L)));
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, 
VARIANT_BEFORE_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(1);
+    assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsSkipsNullVariantValues() {
+    GenericRecord withVariant = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant.setField("id", 1L);
+    withVariant.setField("v", variant);
+
+    GenericRecord withNullVariant = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withNullVariant.setField("id", 2L);
+    withNullVariant.setField("v", null);
+
+    GenericRecord withVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    withVariant2.setField("id", 3L);
+    withVariant2.setField("v", variant);
+
+    List<Record> recordsWithNulls = ImmutableList.of(withVariant, 
withNullVariant, withVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            recordsWithNulls, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("a")).isTrue();
+    assertThat(shreddedTypes.get(2).asGroupType().containsField("b")).isTrue();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsWithAllNullVariantValues() {
+    GenericRecord nullVariant1 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant1.setField("id", 1L);
+    nullVariant1.setField("v", null);
+
+    GenericRecord nullVariant2 = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    nullVariant2.setField("id", 2L);
+    nullVariant2.setField("v", null);
+
+    List<Record> allNullVariants = ImmutableList.of(nullVariant1, 
nullVariant2);
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(
+            allNullVariants, VARIANT_AFTER_ID_SCHEMA, VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).isEmpty();
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNonVariantValues() {
+    GenericRecord invalidRecord = 
GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    invalidRecord.setField("id", 1L);
+    invalidRecord.setField("v", "not-a-variant");
+
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(
+            () ->
+                analyzer.analyzeVariantColumns(
+                    ImmutableList.of(invalidRecord),
+                    VARIANT_AFTER_ID_SCHEMA,
+                    VARIANT_AFTER_ID_SCHEMA))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Expected Variant at index 1 but was: 
java.lang.String");
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsRejectsNullEngineSchema() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    assertThatThrownBy(() -> analyzer.analyzeVariantColumns(records, 
VARIANT_AFTER_ID_SCHEMA, null))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("Invalid engine schema: null");
+  }
+
+  @Test
+  public void testGenericFileWriterFactoryShreddingRoundTrip() throws 
IOException {
+    Table table =
+        TestTables.create(
+            temp.resolve("table").toFile(),
+            "variant",
+            VARIANT_AFTER_ID_SCHEMA,
+            PartitionSpec.unpartitioned(),
+            3);
+    try {
+      GenericFileWriterFactory writerFactory =
+          new GenericFileWriterFactory.Builder(table)
+              .dataFileFormat(FileFormat.PARQUET)
+              .dataSchema(VARIANT_AFTER_ID_SCHEMA)
+              .writerProperties(
+                  ImmutableMap.of(
+                      TableProperties.PARQUET_SHRED_VARIANTS, "true",
+                      TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2"))
+              .build();
+
+      OutputFileFactory fileFactory =
+          OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PARQUET).build();
+      EncryptedOutputFile encryptedOutputFile = fileFactory.newOutputFile();
+
+      // KC path: RegistryBasedFileWriterFactory passes inputSchema=null as 
engineSchema.
+      try (DataWriter<Record> writer =
+          writerFactory.newDataWriter(encryptedOutputFile, table.spec(), 
null)) {
+        for (Record rec : records) {
+          writer.write(rec);
+        }
+      }
+
+      OutputFile outputFile = encryptedOutputFile.encryptingOutputFile();
+      try (ParquetFileReader reader =
+          
ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) {
+        
assertShreddedVariantParquetSchema(reader.getFooter().getFileMetaData().getSchema());
+      }
+
+      assertAllRawParquetRowsShredded(outputFile);
+      assertRecordsRoundTrip(outputFile);
+    } finally {
+      TestTables.clearTables();
+    }
+  }
+
+  @Test
+  public void testFormatModelRegistryShreddingRoundTrip() throws IOException {

Review Comment:
   Both round-trip tests only exercise the auto-derive path — neither ever 
calls `engineSchema()` with a real, non-null `Schema`. Given the 
`schema()`/`engineSchema()` precedence logic is the riskiest part of this PR, 
I'd add a `WriteBuilderWrapper`-level test that sets an explicit engine schema 
and then calls `schema(...)`, asserting the explicit one wins (and ideally one 
that calls them in the reverse order). That's the case that pins down the 
protocol we're relying on.



##########
data/src/main/java/org/apache/iceberg/data/RecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.parquet.VariantShreddingAnalyzer;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantValue;
+
+/**
+ * Generic {@link Record} implementation that extracts variant values from 
{@link Record#get(int)}
+ * using positional indices aligned with {@link Schema#columns()}.
+ *
+ * <p>Buffered rows must be laid out against the same {@link Schema} passed as 
{@code engineSchema};
+ * otherwise {@link Record#get(int)} positions will not match the resolved 
column indices.
+ */
+class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, 
Schema> {
+
+  private final Map<Schema, Map<String, Integer>> columnIndicesBySchema = 
Maps.newHashMap();
+
+  RecordVariantShreddingAnalyzer() {}
+
+  @Override
+  protected int resolveColumnIndex(Schema engineSchema, String columnName) {
+    Preconditions.checkNotNull(engineSchema, "Invalid engine schema: null");
+
+    Map<String, Integer> indices =
+        columnIndicesBySchema.computeIfAbsent(
+            engineSchema, RecordVariantShreddingAnalyzer::indexByName);
+    Integer index = indices.get(columnName);
+    return index != null ? index : -1;

Review Comment:
   When `resolveColumnIndex` returns `-1` the base class quietly skips 
shredding for that column. If an engine schema and the Iceberg schema disagree 
on a name (a rename, say), we'd silently write unshredded with no diagnostic. A 
`LOG.warn` here with the column name would make that failure mode discoverable 
instead of a silent fallback. Not blocking — just easy to lose a day to 
otherwise.



##########
data/src/test/java/org/apache/iceberg/data/TestRecordVariantShreddingAnalyzer.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalTestHelpers;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestRecordVariantShreddingAnalyzer {
+
+  private static final Schema VARIANT_AFTER_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.LongType.get()),
+          Types.NestedField.optional(2, "v", Types.VariantType.get()));
+
+  private static final Schema VARIANT_BEFORE_ID_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "v", Types.VariantType.get()),
+          Types.NestedField.required(2, "id", Types.LongType.get()));
+
+  private Variant variant;
+  private List<Record> records;
+
+  @TempDir private Path temp;
+
+  @BeforeEach
+  public void before() {
+    ByteBuffer metadataBuffer = 
VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
+    VariantMetadata metadata = Variants.metadata(metadataBuffer);
+    ByteBuffer objectBuffer =
+        VariantTestUtil.createObject(
+            metadataBuffer,
+            ImmutableMap.of(
+                "a", Variants.of(42),
+                "b", Variants.of("hello")));
+    variant = Variant.of(metadata, Variants.value(metadata, objectBuffer));
+
+    GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA);
+    records =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("id", 1L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 2L, "v", variant)),
+            record.copy(ImmutableMap.of("id", 3L, "v", variant)));
+  }
+
+  @Test
+  public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() {
+    RecordVariantShreddingAnalyzer analyzer = new 
RecordVariantShreddingAnalyzer();
+
+    Map<Integer, Type> shreddedTypes =
+        analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, 
VARIANT_AFTER_ID_SCHEMA);
+
+    assertThat(shreddedTypes).containsOnlyKeys(2);

Review Comment:
   Every test here shreds exactly one variant column. Could we add one with two 
variant columns at non-adjacent positions (say `id LONG, v1 VARIANT, other 
STRING, v2 VARIANT`) and assert both field IDs come back shredded? That's the 
case that would catch an off-by-one in `indexByName` or a "only the first 
variant gets shredded" regression — the current single-column tests can't.
   
   While we're here, the literal `2` is the field ID of `v`; pulling it into a 
named constant would read better than the bare key.



##########
data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java:
##########
@@ -44,11 +47,13 @@ public static void register() {
     FormatModelRegistry.register(
         ParquetFormatModel.create(
             Record.class,
-            Void.class,
+            Schema.class,
             (icebergSchema, fileSchema, engineSchema) ->
                 GenericParquetWriter.create(icebergSchema, fileSchema),
             (icebergSchema, fileSchema, engineSchema, idToConstant) ->
-                GenericParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+                GenericParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant),
+            new RecordVariantShreddingAnalyzer(),
+            (Function<Schema, UnaryOperator<Record>>) engineSchema -> 
Record::copy));

Review Comment:
   The lambda takes `engineSchema` and ignores it, returning `Record::copy` 
unconditionally. That's fine for this path, but the `Function<S, 
UnaryOperator<D>>` abstraction exists so engines can produce a schema-aware 
copy — here it's pure indirection, and the parameter name reads as if it's used.
   
   If the generic path will never use the engine schema for copying, could we 
pass a plain `UnaryOperator<Record>` and drop the factory wrapper? If the 
factory has to stay for signature reasons, I'd rename the parameter to 
`ignored` so it doesn't mislead. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to