This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6364aaae20 Data: Add TCK tests for Schema Evolution  in 
BaseFormatModelTests (#15843)
6364aaae20 is described below

commit 6364aaae20c50b8bfeb550ec89b3b14feedd51b0
Author: GuoYu <[email protected]>
AuthorDate: Mon May 11 18:36:36 2026 +0800

    Data: Add TCK tests for Schema Evolution  in BaseFormatModelTests (#15843)
---
 build.gradle                                       |   9 +
 .../org/apache/iceberg/avro/AvroTestHelpers.java   |   8 +
 .../apache/iceberg/data/BaseFormatModelTests.java  | 450 +++++++++++++++++++++
 flink/v1.20/build.gradle                           |   2 +
 flink/v2.0/build.gradle                            |   2 +
 flink/v2.1/build.gradle                            |   2 +
 .../apache/iceberg/orc/OrcWritingTestUtils.java    |  35 ++
 .../org/apache/iceberg/orc/TestORCSchemaUtil.java  |   8 +
 .../iceberg/parquet/ParquetWritingTestUtils.java   |   2 +-
 .../iceberg/parquet/ParquetFileTestUtils.java      |  36 ++
 spark/v3.4/build.gradle                            |   2 +
 spark/v3.5/build.gradle                            |   1 +
 spark/v4.0/build.gradle                            |   1 +
 spark/v4.1/build.gradle                            |   1 +
 14 files changed, 558 insertions(+), 1 deletion(-)

diff --git a/build.gradle b/build.gradle
index 261dfabf04..fca32be9dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,8 @@ project(':iceberg-data') {
 
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
+    testImplementation(testFixtures(project(':iceberg-parquet')))
   }
 
   test {
@@ -939,6 +941,13 @@ project(':iceberg-parquet') {
       exclude group: 'org.apache.avro', module: 'avro'
     }
 
+    testFixturesApi(libs.parquet.hadoop) {
+      exclude group: 'org.apache.avro', module: 'avro'
+      // already shaded by Parquet
+      exclude group: 'it.unimi.dsi'
+      exclude group: 'org.codehaus.jackson'
+    }
+
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
   }
diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java 
b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
index 0a1cf43f4f..fd73706ce0 100644
--- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
@@ -177,4 +177,12 @@ public class AvroTestHelpers {
       return reader.getMetaString("avro.codec");
     }
   }
+
+  public static boolean hasIds(Schema schema) {
+    return AvroSchemaUtil.hasIds(schema);
+  }
+
+  public static Schema removeIds(org.apache.iceberg.Schema schema) {
+    return RemoveIds.removeIds(schema);
+  }
 }
diff --git 
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java 
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
index d0b8e3161b..a38b025e0f 100644
--- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assumptions.assumeFalse;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -38,6 +39,14 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.IntStream;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -52,6 +61,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestTables;
+import org.apache.iceberg.avro.AvroTestHelpers;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
@@ -70,6 +81,15 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcWritingTestUtils;
+import org.apache.iceberg.orc.TestORCSchemaUtil;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -77,6 +97,14 @@ import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetWriter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
@@ -1330,6 +1358,280 @@ public abstract class BaseFormatModelTests<T> {
     readAndAssertGenericRecords(fileFormat, schema, 
sourceRecords.stream().map(transform).toList());
   }
 
+  /**
+   * Schema evolution: Adding column (reading with wider schema). Write with 
DefaultSchema, read
+   * with additional optional columns. The new columns should be filled with 
null values.
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionAddColumn(FileFormat fileFormat) throws IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    List<Types.NestedField> evolvedColumns = 
Lists.newArrayList(writeSchema.columns());
+
+    int maxFieldId =
+        
writeSchema.columns().stream().mapToInt(Types.NestedField::fieldId).max().orElse(0);
+    evolvedColumns.add(
+        Types.NestedField.optional("new_string_col")
+            .withId(maxFieldId + 1)
+            .ofType(Types.StringType.get())
+            .build());
+    evolvedColumns.add(
+        Types.NestedField.optional("new_int_col")
+            .withId(maxFieldId + 2)
+            .ofType(Types.IntegerType.get())
+            .build());
+    Schema readSchema = new Schema(evolvedColumns);
+    readAndAssertEngineRecords(
+        fileFormat,
+        readSchema,
+        genericRecords,
+        record -> {
+          Record expected = copy(record, writeSchema, readSchema);
+
+          expected.setField("new_string_col", null);
+          expected.setField("new_int_col", null);
+          return expected;
+        });
+  }
+
+  /**
+   * Schema evolution: Projection / Removing column (reading with narrower 
schema). Write with
+   * DefaultSchema, read with only a subset of columns (skipping middle 
columns).
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionProjection(FileFormat fileFormat) throws IOException 
{
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    List<Types.NestedField> writeColumns = writeSchema.columns();
+    assumeThat(writeColumns).hasSizeGreaterThanOrEqualTo(2);
+    Schema projectedSchema =
+        new Schema(writeColumns.get(0), writeColumns.get(writeColumns.size() - 
1));
+
+    readAndAssertEngineRecords(
+        fileFormat,
+        projectedSchema,
+        genericRecords,
+        record -> copy(record, projectedSchema, projectedSchema));
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionDropAndReAddSameNameColumn(FileFormat fileFormat) 
throws IOException {
+
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    // Remove col_b and add a new col_b with a different field ID
+    Schema readSchema =
+        new Schema(
+            Types.NestedField.required(1, "col_a", Types.StringType.get()),
+            Types.NestedField.optional(6, "col_b", Types.IntegerType.get()),
+            Types.NestedField.required(3, "col_c", Types.LongType.get()),
+            Types.NestedField.required(4, "col_d", Types.FloatType.get()),
+            Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+    readAndAssertEngineRecords(
+        fileFormat,
+        readSchema,
+        genericRecords,
+        record -> {
+          Record expected = GenericRecord.create(readSchema);
+          expected.setField("col_a", record.getField("col_a"));
+          expected.setField("col_b", null);
+          expected.setField("col_c", record.getField("col_c"));
+          expected.setField("col_d", record.getField("col_d"));
+          expected.setField("col_e", record.getField("col_e"));
+          return expected;
+        });
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionTypePromotionIntToLong(FileFormat fileFormat) throws 
IOException {
+    runTypePromotionCheck(
+        fileFormat,
+        Types.IntegerType.get(),
+        Types.LongType.get(),
+        value -> value == null ? null : ((Integer) value).longValue());
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionTypePromotionFloatToDouble(FileFormat fileFormat) 
throws IOException {
+    runTypePromotionCheck(
+        fileFormat,
+        Types.FloatType.get(),
+        Types.DoubleType.get(),
+        value -> value == null ? null : ((Float) value).doubleValue());
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionTypePromotionDecimalPrecision(FileFormat fileFormat) 
throws IOException {
+    runTypePromotionCheck(
+        fileFormat, Types.DecimalType.of(9, 2), Types.DecimalType.of(18, 2), 
Function.identity());
+  }
+
+  /**
+   * Schema evolution: Reorder columns. Write with DefaultSchema {col_a, 
col_b, col_c, col_d,
+   * col_e}, read with reordered schema {col_e, col_c, col_a, col_d, col_b}.
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionReorderColumns(FileFormat fileFormat) throws 
IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    Schema reorderedSchema =
+        new Schema(
+            Types.NestedField.required(5, "col_e", Types.DoubleType.get()),
+            Types.NestedField.required(3, "col_c", Types.LongType.get()),
+            Types.NestedField.required(1, "col_a", Types.StringType.get()),
+            Types.NestedField.required(4, "col_d", Types.FloatType.get()),
+            Types.NestedField.required(2, "col_b", Types.IntegerType.get()));
+
+    readAndAssertEngineRecords(
+        fileFormat,
+        reorderedSchema,
+        genericRecords,
+        record -> copy(record, reorderedSchema, reorderedSchema));
+  }
+
+  /**
+   * Schema evolution: Rename column. Write with DefaultSchema where col_b has 
field ID 2. Read with
+   * a schema where the same field ID 2 is renamed to "column_b". Since 
Iceberg binds by field ID,
+   * the renamed column should still read the original data correctly.
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionRenameColumn(FileFormat fileFormat) throws 
IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    // rename col_b(id=2) -> column_b, col_d(id=4) -> column_d
+    Schema renamedSchema =
+        new Schema(
+            Types.NestedField.required(1, "col_a", Types.StringType.get()),
+            Types.NestedField.required(2, "column_b", Types.IntegerType.get()),
+            Types.NestedField.required(3, "col_c", Types.LongType.get()),
+            Types.NestedField.required(4, "column_d", Types.FloatType.get()),
+            Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+    readAndAssertEngineRecords(
+        fileFormat,
+        renamedSchema,
+        genericRecords,
+        record -> {
+          Record expected = GenericRecord.create(renamedSchema);
+          expected.setField("col_a", record.getField("col_a"));
+          expected.setField("column_b", record.getField("col_b"));
+          expected.setField("col_c", record.getField("col_c"));
+          expected.setField("column_d", record.getField("col_d"));
+          expected.setField("col_e", record.getField("col_e"));
+          return expected;
+        });
+  }
+
+  /**
+   * Schema evolution: Required → Optional. Write with DefaultSchema where all 
columns are required.
+   * Read with a schema where some columns are changed to optional. Iceberg 
allows widening required
+   * to optional. The data should still be read correctly.
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionRequiredToOptional(FileFormat fileFormat) throws 
IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    // change col_b and col_d to optional
+    Schema readSchema =
+        new Schema(
+            Types.NestedField.required(1, "col_a", Types.StringType.get()),
+            Types.NestedField.optional(2, "col_b", Types.IntegerType.get()),
+            Types.NestedField.required(3, "col_c", Types.LongType.get()),
+            Types.NestedField.optional(4, "col_d", Types.FloatType.get()),
+            Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+    readAndAssertEngineRecords(
+        fileFormat, readSchema, genericRecords, record -> copy(record, 
readSchema, readSchema));
+  }
+
+  /**
+   * Schema evolution: Read with empty projection. Write with DefaultSchema, 
read with an empty
+   * schema (no columns). The reader should return the correct number of rows 
but with no data
+   * columns.
+   */
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testSchemaEvolutionEmptyProjection(FileFormat fileFormat) throws 
IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema writeSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    Schema emptySchema = new Schema();
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(emptySchema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    assertThat(readRecords).hasSameSizeAs(genericRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testReadFileWithoutFieldIdsUsingNameMapping(FileFormat fileFormat) 
throws IOException {
+    DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+    Schema icebergSchema = dataGenerator.schema();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+
+    // Write the file WITHOUT Iceberg field IDs (as an external writer would).
+    writeRecordsWithoutFieldIds(fileFormat, icebergSchema, genericRecords);
+
+    NameMapping nameMapping = MappingUtil.create(icebergSchema);
+
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(icebergSchema)
+            .withNameMapping(nameMapping)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    assertEquals(icebergSchema, convertToEngineRecords(genericRecords, 
icebergSchema), readRecords);
+  }
+
   private void readAndAssertGenericRecords(
       FileFormat fileFormat, Schema schema, List<Record> expected) throws 
IOException {
     InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
@@ -1777,4 +2079,152 @@ public abstract class BaseFormatModelTests<T> {
 
     return result;
   }
+
+  private void writeRecordsWithoutFieldIds(
+      FileFormat fileFormat, Schema schema, List<Record> records) throws 
IOException {
+    switch (fileFormat) {
+      case PARQUET -> writeParquetWithoutFieldIds(schema, records);
+      case AVRO -> writeAvroWithoutFieldIds(schema, records);
+      case ORC -> writeOrcWithoutFieldIds(schema, records);
+      default -> throw new UnsupportedOperationException("Unsupported file 
format: " + fileFormat);
+    }
+  }
+
+  private void writeAvroWithoutFieldIds(Schema schema, List<Record> records) 
throws IOException {
+    org.apache.avro.Schema avroSchemaWithoutIds = 
AvroTestHelpers.removeIds(schema);
+
+    OutputFile outputFile = encryptedFile.encryptingOutputFile();
+    DatumWriter<GenericData.Record> datumWriter = new 
GenericDatumWriter<>(avroSchemaWithoutIds);
+    try (OutputStream out = outputFile.create();
+        DataFileWriter<GenericData.Record> writer = new 
DataFileWriter<>(datumWriter)) {
+      writer.create(avroSchemaWithoutIds, out);
+      for (Record record : records) {
+        GenericData.Record avroRecord = new 
GenericData.Record(avroSchemaWithoutIds);
+        for (Types.NestedField field : schema.columns()) {
+          avroRecord.put(field.name(), record.getField(field.name()));
+        }
+
+        writer.append(avroRecord);
+      }
+    }
+
+    try (DataFileStream<GenericData.Record> reader =
+        new DataFileStream<>(outputFile.toInputFile().newStream(), new 
GenericDatumReader<>())) {
+      assertThat(AvroTestHelpers.hasIds(reader.getSchema())).isFalse();
+    }
+  }
+
+  private void writeParquetWithoutFieldIds(Schema schema, List<Record> 
records) throws IOException {
+    org.apache.avro.Schema avroSchemaWithoutIds = 
AvroTestHelpers.removeIds(schema);
+
+    OutputFile outputFile = encryptedFile.encryptingOutputFile();
+
+    try (ParquetWriter<GenericData.Record> writer =
+        
AvroParquetWriter.<GenericData.Record>builder(ParquetFileTestUtils.file(outputFile))
+            .withDataModel(GenericData.get())
+            .withSchema(avroSchemaWithoutIds)
+            .withConf(new Configuration())
+            .build()) {
+      for (Record record : records) {
+        GenericData.Record avroRecord = new 
GenericData.Record(avroSchemaWithoutIds);
+        for (Types.NestedField field : schema.columns()) {
+          avroRecord.put(field.name(), record.getField(field.name()));
+        }
+
+        writer.write(avroRecord);
+      }
+    }
+
+    try (ParquetFileReader reader =
+        
ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) {
+      
assertThat(ParquetSchemaUtil.hasIds(reader.getFooter().getFileMetaData().getSchema()))
+          .isFalse();
+    }
+  }
+
+  private void writeOrcWithoutFieldIds(Schema schema, List<Record> records) 
throws IOException {
+    TypeDescription typeWithIds = ORCSchemaUtil.convert(schema);
+    TypeDescription typeWithoutIds = TestORCSchemaUtil.removeIds(typeWithIds);
+
+    OutputFile outputFile = encryptedFile.encryptingOutputFile();
+    Path hadoopPath = new Path(outputFile.location());
+
+    Configuration conf = new Configuration();
+    OrcFile.WriterOptions options =
+        OrcFile.writerOptions(conf)
+            .useUTCTimestamp(true)
+            .setSchema(typeWithoutIds)
+            .fileSystem(OrcWritingTestUtils.outputFileSystem(outputFile));
+
+    OrcRowWriter<Record> rowWriter = GenericOrcWriter.buildWriter(schema, 
typeWithIds);
+
+    try (Writer orcWriter = OrcFile.createWriter(hadoopPath, options)) {
+      VectorizedRowBatch batch = typeWithoutIds.createRowBatch();
+      for (Record record : records) {
+        rowWriter.write(record, batch);
+        if (batch.size == batch.getMaxSize()) {
+          orcWriter.addRowBatch(batch);
+          batch.reset();
+        }
+      }
+
+      if (batch.size > 0) {
+        orcWriter.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+
+    InputFile inputFile = outputFile.toInputFile();
+    OrcFile.ReaderOptions readerOptions =
+        OrcFile.readerOptions(conf)
+            .useUTCTimestamp(true)
+            .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile))
+            .maxLength(inputFile.getLength());
+
+    try (Reader reader = OrcFile.createReader(hadoopPath, readerOptions)) {
+      assertThat(TestORCSchemaUtil.hasIds(reader.getSchema())).isFalse();
+    }
+  }
+
+  private void runTypePromotionCheck(
+      FileFormat fileFormat, Type fromType, Type toType, Function<Object, 
Object> promoteValue)
+      throws IOException {
+    String columnName = "col";
+    Schema writeSchema = new Schema(Types.NestedField.required(1, columnName, 
fromType));
+    Schema readSchema = new Schema(Types.NestedField.required(1, columnName, 
toType));
+
+    List<Record> genericRecords = RandomGenericData.generate(writeSchema, 10, 
1L);
+    writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+    readAndAssertEngineRecords(
+        fileFormat,
+        readSchema,
+        genericRecords,
+        record -> {
+          Record expected = GenericRecord.create(readSchema);
+          expected.setField(columnName, 
promoteValue.apply(record.getField(columnName)));
+          return expected;
+        });
+  }
+
+  private void readAndAssertEngineRecords(
+      FileFormat fileFormat,
+      Schema readSchema,
+      List<Record> sourceRecords,
+      Function<Record, Record> converter)
+      throws IOException {
+    List<Record> expectedGenericRecords = 
sourceRecords.stream().map(converter).toList();
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(readSchema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    assertThat(readRecords).hasSize(expectedGenericRecords.size());
+    assertEquals(
+        readSchema, convertToEngineRecords(expectedGenericRecords, 
readSchema), readRecords);
+  }
 }
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index 467b0fa8c9..41f2489c80 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
+    testImplementation(testFixtures(project(':iceberg-parquet')))
 
     // By default, hive-exec is a fat/uber jar and it exports a guava library
     // that's really old. We use the core classifier to be able to override 
our guava
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index f80a312421..7bc37b30e5 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
+    testImplementation(testFixtures(project(':iceberg-parquet')))
 
     // By default, hive-exec is a fat/uber jar and it exports a guava library
     // that's really old. We use the core classifier to be able to override 
our guava
diff --git a/flink/v2.1/build.gradle b/flink/v2.1/build.gradle
index 451f144147..f93b61646e 100644
--- a/flink/v2.1/build.gradle
+++ b/flink/v2.1/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
+    testImplementation(testFixtures(project(':iceberg-parquet')))
 
     // By default, hive-exec is a fat/uber jar and it exports a guava library
     // that's really old. We use the core classifier to be able to override 
our guava
diff --git a/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java 
b/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java
new file mode 100644
index 0000000000..72ed03ce2c
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+
+public class OrcWritingTestUtils {
+  private OrcWritingTestUtils() {}
+
+  public static FileSystem outputFileSystem(OutputFile file) {
+    return new FileIOFSUtil.OutputFileSystem(file);
+  }
+
+  public static FileSystem inputFileSystem(InputFile file) {
+    return new FileIOFSUtil.InputFileSystem(file);
+  }
+}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
index c19e36be3a..e331ca94a2 100644
--- a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
@@ -560,4 +560,12 @@ public class TestORCSchemaUtil {
 
     return true;
   }
+
+  public static TypeDescription removeIds(TypeDescription type) {
+    return ORCSchemaUtil.removeIds(type);
+  }
+
+  public static boolean hasIds(TypeDescription orcSchema) {
+    return ORCSchemaUtil.hasIds(orcSchema);
+  }
 }
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index b8cd38f56d..441073d34a 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -35,7 +35,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.parquet.schema.MessageType;
 
 /** Utilities for tests that need to write Parquet files. */
-class ParquetWritingTestUtils {
+public class ParquetWritingTestUtils {
 
   private ParquetWritingTestUtils() {}
 
diff --git 
a/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
 
b/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
new file mode 100644
index 0000000000..a6055424c0
--- /dev/null
+++ 
b/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+
+/** Utilities for tests that need to write Parquet files. */
+public class ParquetFileTestUtils {
+
+  private ParquetFileTestUtils() {}
+
+  public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+    return ParquetIO.file(file);
+  }
+
+  public static InputFile file(org.apache.iceberg.io.InputFile file) {
+    return ParquetIO.file(file);
+  }
+}
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index ead4a32f49..57e4853177 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -105,8 +105,10 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
     testImplementation (project(path: ':iceberg-open-api', configuration: 
'testFixturesRuntimeElements'))
     testImplementation libs.awaitility
+    testImplementation(testFixtures(project(':iceberg-parquet')))
   }
 
   test {
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index a69b78e5ad..68bdb1c21a 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -105,6 +105,7 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
     testImplementation (project(path: ':iceberg-open-api', configuration: 
'testFixturesRuntimeElements'))
     testImplementation libs.awaitility
     testImplementation(testFixtures(project(':iceberg-parquet')))
diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle
index ba2e0fd4ba..3707e01e48 100644
--- a/spark/v4.0/build.gradle
+++ b/spark/v4.0/build.gradle
@@ -105,6 +105,7 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
     testImplementation (project(path: ':iceberg-open-api', configuration: 
'testFixturesRuntimeElements'))
     testImplementation libs.awaitility
     testImplementation(testFixtures(project(':iceberg-parquet')))
diff --git a/spark/v4.1/build.gradle b/spark/v4.1/build.gradle
index 02e4323e70..e6455fa34f 100644
--- a/spark/v4.1/build.gradle
+++ b/spark/v4.1/build.gradle
@@ -105,6 +105,7 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-orc', configuration: 
'testArtifacts')
     testImplementation (project(path: ':iceberg-open-api', configuration: 
'testFixturesRuntimeElements'))
     testImplementation libs.awaitility
     testImplementation(testFixtures(project(':iceberg-parquet')))

Reply via email to