This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b38573dcfb Core: Add basic classes for writing table format-version 4 
(#13123)
b38573dcfb is described below

commit b38573dcfb4a5467a70c28573a2d39874718a2a1
Author: Eduard Tudenhoefner <etudenhoef...@gmail.com>
AuthorDate: Wed Jun 4 10:49:18 2025 +0200

    Core: Add basic classes for writing table format-version 4 (#13123)
---
 .../test/java/org/apache/iceberg/TestHelpers.java  |   6 +-
 .../java/org/apache/iceberg/ManifestFiles.java     |   4 +
 .../org/apache/iceberg/ManifestListWriter.java     |  58 +++
 .../java/org/apache/iceberg/ManifestLists.java     |   3 +
 .../java/org/apache/iceberg/ManifestWriter.java    |  75 +++
 .../apache/iceberg/MergingSnapshotProducer.java    |   7 +
 .../java/org/apache/iceberg/TableMetadata.java     |   2 +-
 .../main/java/org/apache/iceberg/V4Metadata.java   | 526 +++++++++++++++++++++
 .../apache/iceberg/DeleteFileIndexTestBase.java    |   4 +-
 .../iceberg/ScanPlanningAndReportingTestBase.java  |   5 +-
 .../src/test/java/org/apache/iceberg/TestBase.java |   3 +
 .../org/apache/iceberg/TestCommitReporting.java    |   5 +-
 .../java/org/apache/iceberg/TestDeleteFiles.java   |  10 +-
 .../org/apache/iceberg/TestFormatVersions.java     |   7 +-
 .../org/apache/iceberg/TestLocalFilterFiles.java   |   7 +-
 .../java/org/apache/iceberg/TestMergeAppend.java   |  12 +-
 .../apache/iceberg/TestMetadataTableFilters.java   |  46 +-
 .../test/java/org/apache/iceberg/TestMetrics.java  |   6 +-
 .../java/org/apache/iceberg/TestMetricsModes.java  |   6 +-
 .../java/org/apache/iceberg/TestOverwrite.java     |  11 +-
 .../iceberg/TestOverwriteWithValidation.java       |  11 +-
 .../org/apache/iceberg/TestPartitionSpecInfo.java  |   6 +-
 .../java/org/apache/iceberg/TestRewriteFiles.java  |   2 +-
 .../test/java/org/apache/iceberg/TestRowDelta.java |  11 +-
 .../org/apache/iceberg/TestRowLineageMetadata.java |  22 +-
 .../iceberg/TestScansAndSchemaEvolution.java       |   6 +-
 .../java/org/apache/iceberg/TestSortOrder.java     |   6 +-
 .../resources/TableMetadataUnsupportedVersion.json |   2 +-
 28 files changed, 762 insertions(+), 107 deletions(-)

diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java 
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index fa6a081d76..ef83b26db4 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -54,9 +54,13 @@ public class TestHelpers {
 
   private TestHelpers() {}
 
-  public static final int MAX_FORMAT_VERSION = 3;
+  public static final int MAX_FORMAT_VERSION = 4;
   public static final List<Integer> ALL_VERSIONS =
       IntStream.rangeClosed(1, 
MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
+  public static final List<Integer> V2_AND_ABOVE =
+      IntStream.rangeClosed(2, 
MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
+  public static final List<Integer> V3_AND_ABOVE =
+      IntStream.rangeClosed(3, 
MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
 
   /** Wait in a tight check loop until system clock is past {@code 
timestampMillis} */
   public static long waitUntilAfter(long timestampMillis) {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java 
b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index fe87a21678..739f0be251 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -208,6 +208,8 @@ public class ManifestFiles {
         return new ManifestWriter.V2Writer(spec, encryptedOutputFile, 
snapshotId);
       case 3:
         return new ManifestWriter.V3Writer(spec, encryptedOutputFile, 
snapshotId, firstRowId);
+      case 4:
+        return new ManifestWriter.V4Writer(spec, encryptedOutputFile, 
snapshotId, firstRowId);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest for table version: " + formatVersion);
@@ -266,6 +268,8 @@ public class ManifestFiles {
         return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
       case 3:
         return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId);
+      case 4:
+        return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index c2cb1bf8c8..7525e76365 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -73,6 +73,64 @@ abstract class ManifestListWriter implements 
FileAppender<ManifestFile> {
     return null;
   }
 
+  static class V4Writer extends ManifestListWriter {
+    private final V4Metadata.ManifestFileWrapper wrapper;
+    private Long nextRowId;
+
+    V4Writer(
+        OutputFile snapshotFile,
+        long snapshotId,
+        Long parentSnapshotId,
+        long sequenceNumber,
+        long firstRowId) {
+      super(
+          snapshotFile,
+          ImmutableMap.of(
+              "snapshot-id", String.valueOf(snapshotId),
+              "parent-snapshot-id", String.valueOf(parentSnapshotId),
+              "sequence-number", String.valueOf(sequenceNumber),
+              "first-row-id", String.valueOf(firstRowId),
+              "format-version", "4"));
+      this.wrapper = new V4Metadata.ManifestFileWrapper(snapshotId, 
sequenceNumber);
+      this.nextRowId = firstRowId;
+    }
+
+    @Override
+    protected ManifestFile prepare(ManifestFile manifest) {
+      if (manifest.content() != ManifestContent.DATA || manifest.firstRowId() 
!= null) {
+        return wrapper.wrap(manifest, null);
+      } else {
+        // assign first-row-id and update the next to assign
+        wrapper.wrap(manifest, nextRowId);
+        // leave space for existing and added rows, in case any of the 
existing data files do not
+        // have an assigned first-row-id (this is the case with manifests from 
pre-v3 snapshots)
+        this.nextRowId += manifest.existingRowsCount() + 
manifest.addedRowsCount();
+        return wrapper;
+      }
+    }
+
+    @Override
+    protected FileAppender<ManifestFile> newAppender(OutputFile file, 
Map<String, String> meta) {
+      try {
+        return InternalData.write(FileFormat.AVRO, file)
+            .schema(V4Metadata.MANIFEST_LIST_SCHEMA)
+            .named("manifest_file")
+            .meta(meta)
+            .overwrite()
+            .build();
+
+      } catch (IOException e) {
+        throw new RuntimeIOException(
+            e, "Failed to create snapshot list writer for path: %s", 
file.location());
+      }
+    }
+
+    @Override
+    public Long nextRowId() {
+      return nextRowId;
+    }
+  }
+
   static class V3Writer extends ManifestListWriter {
     private final V3Metadata.ManifestFileWrapper wrapper;
     private Long nextRowId;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java 
b/core/src/main/java/org/apache/iceberg/ManifestLists.java
index 0271baaac0..47be11c321 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestLists.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java
@@ -67,6 +67,9 @@ class ManifestLists {
       case 3:
         return new ManifestListWriter.V3Writer(
             manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, 
firstRowId);
+      case 4:
+        return new ManifestListWriter.V4Writer(
+            manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, 
firstRowId);
     }
     throw new UnsupportedOperationException(
         "Cannot write manifest list for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java 
b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 95064759eb..fd560b2b83 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -221,6 +221,81 @@ public abstract class ManifestWriter<F extends 
ContentFile<F>> implements FileAp
     writer.close();
   }
 
+  static class V4Writer extends ManifestWriter<DataFile> {
+    private final V4Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
+
+    V4Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId, 
Long firstRowId) {
+      super(spec, file, snapshotId, firstRowId);
+      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+    }
+
+    @Override
+    protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
+      return entryWrapper.wrap(entry);
+    }
+
+    @Override
+    protected FileAppender<ManifestEntry<DataFile>> newAppender(
+        PartitionSpec spec, OutputFile file) {
+      Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
+      try {
+        return InternalData.write(FileFormat.AVRO, file)
+            .schema(manifestSchema)
+            .named("manifest_entry")
+            .meta("schema", SchemaParser.toJson(spec.schema()))
+            .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+            .meta("partition-spec-id", String.valueOf(spec.specId()))
+            .meta("format-version", "4")
+            .meta("content", "data")
+            .overwrite()
+            .build();
+      } catch (IOException e) {
+        throw new RuntimeIOException(
+            e, "Failed to create manifest writer for path: %s", 
file.location());
+      }
+    }
+  }
+
+  static class V4DeleteWriter extends ManifestWriter<DeleteFile> {
+    private final V4Metadata.ManifestEntryWrapper<DeleteFile> entryWrapper;
+
+    V4DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long 
snapshotId) {
+      super(spec, file, snapshotId, null);
+      this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+    }
+
+    @Override
+    protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> 
entry) {
+      return entryWrapper.wrap(entry);
+    }
+
+    @Override
+    protected FileAppender<ManifestEntry<DeleteFile>> newAppender(
+        PartitionSpec spec, OutputFile file) {
+      Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
+      try {
+        return InternalData.write(FileFormat.AVRO, file)
+            .schema(manifestSchema)
+            .named("manifest_entry")
+            .meta("schema", SchemaParser.toJson(spec.schema()))
+            .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+            .meta("partition-spec-id", String.valueOf(spec.specId()))
+            .meta("format-version", "4")
+            .meta("content", "deletes")
+            .overwrite()
+            .build();
+      } catch (IOException e) {
+        throw new RuntimeIOException(
+            e, "Failed to create manifest writer for path: %s", 
file.location());
+      }
+    }
+
+    @Override
+    protected ManifestContent content() {
+      return ManifestContent.DELETES;
+    }
+  }
+
   static class V3Writer extends ManifestWriter<DataFile> {
     private final V3Metadata.ManifestEntryWrapper<DataFile> entryWrapper;
 
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index bc9bf7b6e1..cae11dce55 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -295,6 +295,13 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
             formatVersion(),
             file.location());
         break;
+      case 4:
+        Preconditions.checkArgument(
+            file.content() == FileContent.EQUALITY_DELETES || 
ContentFileUtil.isDV(file),
+            "Must use DVs for position deletes in V%s: %s",
+            formatVersion(),
+            file.location());
+        break;
       default:
         throw new IllegalArgumentException("Unsupported format version: " + 
formatVersion());
     }
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java 
b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index f932827b76..681407543f 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -54,7 +54,7 @@ public class TableMetadata implements Serializable {
   static final long INITIAL_SEQUENCE_NUMBER = 0;
   static final long INVALID_SEQUENCE_NUMBER = -1;
   static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
-  static final int SUPPORTED_TABLE_FORMAT_VERSION = 3;
+  static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
   static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
   static final int INITIAL_SPEC_ID = 0;
   static final int INITIAL_SORT_ORDER_ID = 1;
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java 
b/core/src/main/java/org/apache/iceberg/V4Metadata.java
new file mode 100644
index 0000000000..67478290aa
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+class V4Metadata {
+  private V4Metadata() {}
+
+  static final Schema MANIFEST_LIST_SCHEMA =
+      new Schema(
+          ManifestFile.PATH,
+          ManifestFile.LENGTH,
+          ManifestFile.SPEC_ID,
+          ManifestFile.MANIFEST_CONTENT.asRequired(),
+          ManifestFile.SEQUENCE_NUMBER.asRequired(),
+          ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
+          ManifestFile.SNAPSHOT_ID,
+          ManifestFile.ADDED_FILES_COUNT.asRequired(),
+          ManifestFile.EXISTING_FILES_COUNT.asRequired(),
+          ManifestFile.DELETED_FILES_COUNT.asRequired(),
+          ManifestFile.ADDED_ROWS_COUNT.asRequired(),
+          ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
+          ManifestFile.DELETED_ROWS_COUNT.asRequired(),
+          ManifestFile.PARTITION_SUMMARIES,
+          ManifestFile.KEY_METADATA,
+          ManifestFile.FIRST_ROW_ID);
+
+  /**
+   * A wrapper class to write any ManifestFile implementation to Avro using 
the v4 write schema.
+   *
+   * <p>This is used to maintain compatibility with v4 by writing manifest 
list files with the old
+   * schema, instead of writing a sequence number into metadata files in v4 
tables.
+   */
+  static class ManifestFileWrapper implements ManifestFile, StructLike {
+    private final long commitSnapshotId;
+    private final long sequenceNumber;
+    private ManifestFile wrapped = null;
+    private Long wrappedFirstRowId = null;
+
+    ManifestFileWrapper(long commitSnapshotId, long sequenceNumber) {
+      this.commitSnapshotId = commitSnapshotId;
+      this.sequenceNumber = sequenceNumber;
+    }
+
+    public ManifestFile wrap(ManifestFile file, Long firstRowId) {
+      this.wrapped = file;
+      this.wrappedFirstRowId = firstRowId;
+      return this;
+    }
+
+    @Override
+    public int size() {
+      return MANIFEST_LIST_SCHEMA.columns().size();
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("Cannot modify 
ManifestFileWrapper wrapper via set");
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(get(pos));
+    }
+
+    private Object get(int pos) {
+      switch (pos) {
+        case 0:
+          return wrapped.path();
+        case 1:
+          return wrapped.length();
+        case 2:
+          return wrapped.partitionSpecId();
+        case 3:
+          return wrapped.content().id();
+        case 4:
+          if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
+            // if the sequence number is being assigned here, then the 
manifest must be created by
+            // the current
+            // operation. to validate this, check that the snapshot id matches 
the current commit
+            Preconditions.checkState(
+                commitSnapshotId == wrapped.snapshotId(),
+                "Found unassigned sequence number for a manifest from 
snapshot: %s",
+                wrapped.snapshotId());
+            return sequenceNumber;
+          } else {
+            return wrapped.sequenceNumber();
+          }
+        case 5:
+          if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
+            // same sanity check as above
+            Preconditions.checkState(
+                commitSnapshotId == wrapped.snapshotId(),
+                "Found unassigned sequence number for a manifest from 
snapshot: %s",
+                wrapped.snapshotId());
+            // if the min sequence number is not determined, then there was no 
assigned sequence
+            // number for any file
+            // written to the wrapped manifest. replace the unassigned 
sequence number with the one
+            // for this commit
+            return sequenceNumber;
+          } else {
+            return wrapped.minSequenceNumber();
+          }
+        case 6:
+          return wrapped.snapshotId();
+        case 7:
+          return wrapped.addedFilesCount();
+        case 8:
+          return wrapped.existingFilesCount();
+        case 9:
+          return wrapped.deletedFilesCount();
+        case 10:
+          return wrapped.addedRowsCount();
+        case 11:
+          return wrapped.existingRowsCount();
+        case 12:
+          return wrapped.deletedRowsCount();
+        case 13:
+          return wrapped.partitions();
+        case 14:
+          return wrapped.keyMetadata();
+        case 15:
+          if (wrappedFirstRowId != null) {
+            // if first-row-id is assigned, ensure that it is valid
+            Preconditions.checkState(
+                wrapped.content() == ManifestContent.DATA && 
wrapped.firstRowId() == null,
+                "Found invalid first-row-id assignment: %s",
+                wrapped);
+            return wrappedFirstRowId;
+          } else if (wrapped.content() != ManifestContent.DATA) {
+            return null;
+          } else {
+            Preconditions.checkState(
+                wrapped.firstRowId() != null,
+                "Found unassigned first-row-id for file: " + wrapped.path());
+            return wrapped.firstRowId();
+          }
+        default:
+          throw new UnsupportedOperationException("Unknown field ordinal: " + 
pos);
+      }
+    }
+
+    @Override
+    public String path() {
+      return wrapped.path();
+    }
+
+    @Override
+    public long length() {
+      return wrapped.length();
+    }
+
+    @Override
+    public int partitionSpecId() {
+      return wrapped.partitionSpecId();
+    }
+
+    @Override
+    public ManifestContent content() {
+      return wrapped.content();
+    }
+
+    @Override
+    public long sequenceNumber() {
+      return wrapped.sequenceNumber();
+    }
+
+    @Override
+    public long minSequenceNumber() {
+      return wrapped.minSequenceNumber();
+    }
+
+    @Override
+    public Long snapshotId() {
+      return wrapped.snapshotId();
+    }
+
+    @Override
+    public boolean hasAddedFiles() {
+      return wrapped.hasAddedFiles();
+    }
+
+    @Override
+    public Integer addedFilesCount() {
+      return wrapped.addedFilesCount();
+    }
+
+    @Override
+    public Long addedRowsCount() {
+      return wrapped.addedRowsCount();
+    }
+
+    @Override
+    public boolean hasExistingFiles() {
+      return wrapped.hasExistingFiles();
+    }
+
+    @Override
+    public Integer existingFilesCount() {
+      return wrapped.existingFilesCount();
+    }
+
+    @Override
+    public Long existingRowsCount() {
+      return wrapped.existingRowsCount();
+    }
+
+    @Override
+    public boolean hasDeletedFiles() {
+      return wrapped.hasDeletedFiles();
+    }
+
+    @Override
+    public Integer deletedFilesCount() {
+      return wrapped.deletedFilesCount();
+    }
+
+    @Override
+    public Long deletedRowsCount() {
+      return wrapped.deletedRowsCount();
+    }
+
+    @Override
+    public List<PartitionFieldSummary> partitions() {
+      return wrapped.partitions();
+    }
+
+    @Override
+    public ByteBuffer keyMetadata() {
+      return wrapped.keyMetadata();
+    }
+
+    @Override
+    public Long firstRowId() {
+      return wrapped.firstRowId();
+    }
+
+    @Override
+    public ManifestFile copy() {
+      return wrapped.copy();
+    }
+  }
+
+  static Schema entrySchema(Types.StructType partitionType) {
+    return wrapFileSchema(fileType(partitionType));
+  }
+
+  static Schema wrapFileSchema(Types.StructType fileSchema) {
+    // this is used to build projection schemas
+    return new Schema(
+        ManifestEntry.STATUS,
+        ManifestEntry.SNAPSHOT_ID,
+        ManifestEntry.SEQUENCE_NUMBER,
+        ManifestEntry.FILE_SEQUENCE_NUMBER,
+        required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
+  }
+
+  static Types.StructType fileType(Types.StructType partitionType) {
+    return Types.StructType.of(
+        DataFile.CONTENT.asRequired(),
+        DataFile.FILE_PATH,
+        DataFile.FILE_FORMAT,
+        required(
+            DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, 
DataFile.PARTITION_DOC),
+        DataFile.RECORD_COUNT,
+        DataFile.FILE_SIZE,
+        DataFile.COLUMN_SIZES,
+        DataFile.VALUE_COUNTS,
+        DataFile.NULL_VALUE_COUNTS,
+        DataFile.NAN_VALUE_COUNTS,
+        DataFile.LOWER_BOUNDS,
+        DataFile.UPPER_BOUNDS,
+        DataFile.KEY_METADATA,
+        DataFile.SPLIT_OFFSETS,
+        DataFile.EQUALITY_IDS,
+        DataFile.SORT_ORDER_ID,
+        DataFile.FIRST_ROW_ID,
+        DataFile.REFERENCED_DATA_FILE,
+        DataFile.CONTENT_OFFSET,
+        DataFile.CONTENT_SIZE);
+  }
+
+  static class ManifestEntryWrapper<F extends ContentFile<F>>
+      implements ManifestEntry<F>, StructLike {
+    private final int size;
+    private final Long commitSnapshotId;
+    private final DataFileWrapper<?> fileWrapper;
+    private ManifestEntry<F> wrapped = null;
+
+    ManifestEntryWrapper(Long commitSnapshotId) {
+      this.size = entrySchema(Types.StructType.of()).columns().size();
+      this.commitSnapshotId = commitSnapshotId;
+      this.fileWrapper = new DataFileWrapper<>();
+    }
+
+    public ManifestEntryWrapper<F> wrap(ManifestEntry<F> entry) {
+      this.wrapped = entry;
+      return this;
+    }
+
+    @Override
+    public int size() {
+      return size;
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("Cannot modify 
ManifestEntryWrapper wrapper via set");
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(get(pos));
+    }
+
+    private Object get(int pos) {
+      switch (pos) {
+        case 0:
+          return wrapped.status().id();
+        case 1:
+          return wrapped.snapshotId();
+        case 2:
+          if (wrapped.dataSequenceNumber() == null) {
+            // if the entry's data sequence number is null,
+            // then it will inherit the sequence number of the current commit.
+            // to validate that this is correct, check that the snapshot id is 
either null (will
+            // also be inherited) or that it matches the id of the current 
commit.
+            Preconditions.checkState(
+                wrapped.snapshotId() == null || 
wrapped.snapshotId().equals(commitSnapshotId),
+                "Found unassigned sequence number for an entry from snapshot: 
%s",
+                wrapped.snapshotId());
+
+            // inheritance should work only for ADDED entries
+            Preconditions.checkState(
+                wrapped.status() == Status.ADDED,
+                "Only entries with status ADDED can have null sequence 
number");
+
+            return null;
+          }
+          return wrapped.dataSequenceNumber();
+        case 3:
+          return wrapped.fileSequenceNumber();
+        case 4:
+          return fileWrapper.wrap(wrapped.file());
+        default:
+          throw new UnsupportedOperationException("Unknown field ordinal: " + 
pos);
+      }
+    }
+
+    @Override
+    public Status status() {
+      return wrapped.status();
+    }
+
+    @Override
+    public Long snapshotId() {
+      return wrapped.snapshotId();
+    }
+
+    @Override
+    public void setSnapshotId(long snapshotId) {
+      wrapped.setSnapshotId(snapshotId);
+    }
+
+    @Override
+    public Long dataSequenceNumber() {
+      return wrapped.dataSequenceNumber();
+    }
+
+    @Override
+    public void setDataSequenceNumber(long dataSequenceNumber) {
+      wrapped.setDataSequenceNumber(dataSequenceNumber);
+    }
+
+    @Override
+    public Long fileSequenceNumber() {
+      return wrapped.fileSequenceNumber();
+    }
+
+    @Override
+    public void setFileSequenceNumber(long fileSequenceNumber) {
+      wrapped.setFileSequenceNumber(fileSequenceNumber);
+    }
+
+    @Override
+    public F file() {
+      return wrapped.file();
+    }
+
+    @Override
+    public ManifestEntry<F> copy() {
+      return wrapped.copy();
+    }
+
+    @Override
+    public ManifestEntry<F> copyWithoutStats() {
+      return wrapped.copyWithoutStats();
+    }
+  }
+
+  /** Wrapper used to write DataFile or DeleteFile to v4 metadata. */
+  static class DataFileWrapper<F extends ContentFile<F>> extends 
Delegates.DelegatingContentFile<F>
+      implements ContentFile<F>, StructLike {
+    private final int size;
+
+    DataFileWrapper() {
+      super(null);
+      this.size = fileType(Types.StructType.of()).fields().size();
+    }
+
+    @SuppressWarnings("unchecked")
+    DataFileWrapper<F> wrap(ContentFile<?> file) {
+      setWrapped((F) file);
+      return this;
+    }
+
+    @Override
+    public int size() {
+      return size;
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("Cannot modify DataFileWrapper 
wrapper via set");
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(get(pos));
+    }
+
+    private Object get(int pos) {
+      switch (pos) {
+        case 0:
+          return wrapped.content().id();
+        case 1:
+          return wrapped.location();
+        case 2:
+          return wrapped.format() != null ? wrapped.format().toString() : null;
+        case 3:
+          return wrapped.partition();
+        case 4:
+          return wrapped.recordCount();
+        case 5:
+          return wrapped.fileSizeInBytes();
+        case 6:
+          return wrapped.columnSizes();
+        case 7:
+          return wrapped.valueCounts();
+        case 8:
+          return wrapped.nullValueCounts();
+        case 9:
+          return wrapped.nanValueCounts();
+        case 10:
+          return wrapped.lowerBounds();
+        case 11:
+          return wrapped.upperBounds();
+        case 12:
+          return wrapped.keyMetadata();
+        case 13:
+          return wrapped.splitOffsets();
+        case 14:
+          return wrapped.equalityFieldIds();
+        case 15:
+          return wrapped.sortOrderId();
+        case 16:
+          if (wrapped.content() == FileContent.DATA) {
+            return wrapped.firstRowId();
+          } else {
+            return null;
+          }
+        case 17:
+          if (wrapped.content() == FileContent.POSITION_DELETES) {
+            return ((DeleteFile) wrapped).referencedDataFile();
+          } else {
+            return null;
+          }
+        case 18:
+          if (wrapped.content() == FileContent.POSITION_DELETES) {
+            return ((DeleteFile) wrapped).contentOffset();
+          } else {
+            return null;
+          }
+        case 19:
+          if (wrapped.content() == FileContent.POSITION_DELETES) {
+            return ((DeleteFile) wrapped).contentSizeInBytes();
+          } else {
+            return null;
+          }
+      }
+      throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+    }
+
+    @Override
+    public String manifestLocation() {
+      return null;
+    }
+
+    @Override
+    public Long pos() {
+      return null;
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java 
b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index 70f4436ff1..8645577095 100644
--- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -49,8 +49,8 @@ public abstract class DeleteFileIndexTestBase<
     extends TestBase {
 
   @Parameters(name = "formatVersion = {0}")
-  public static List<Object> parameters() {
-    return Arrays.asList(2, 3);
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
   }
 
   static final DeleteFile FILE_A_EQ_1 =
diff --git 
a/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java 
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
index 7f7d33a7b5..4fc76db7b4 100644
--- 
a/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
+++ 
b/core/src/test/java/org/apache/iceberg/ScanPlanningAndReportingTestBase.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.expressions.Expressions;
@@ -46,8 +45,8 @@ public abstract class ScanPlanningAndReportingTestBase<
   private final TestMetricsReporter reporter = new TestMetricsReporter();
 
   @Parameters(name = "formatVersion = {0}")
-  public static List<Object> parameters() {
-    return Arrays.asList(2, 3);
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
   }
 
   protected abstract ScanT newScan(Table table);
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java 
b/core/src/test/java/org/apache/iceberg/TestBase.java
index 1b5ed5a5ea..5d0919c568 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -386,6 +386,9 @@ public class TestBase {
       case 3:
         manifestEntrySchema = 
V3Metadata.entrySchema(table.spec().partitionType());
         break;
+      case 4:
+        manifestEntrySchema = 
V4Metadata.entrySchema(table.spec().partitionType());
+        break;
       default:
         throw new IllegalArgumentException(
             "Unsupported format version: " + 
table.ops().current().formatVersion());
diff --git a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java 
b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
index d446f45c8b..d17348a99c 100644
--- a/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
+++ b/core/src/test/java/org/apache/iceberg/TestCommitReporting.java
@@ -20,7 +20,6 @@ package org.apache.iceberg;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.Arrays;
 import java.util.List;
 import org.apache.iceberg.ScanPlanningAndReportingTestBase.TestMetricsReporter;
 import org.apache.iceberg.metrics.CommitMetricsResult;
@@ -35,8 +34,8 @@ public class TestCommitReporting extends TestBase {
   private final TestMetricsReporter reporter = new TestMetricsReporter();
 
   @Parameters(name = "formatVersion = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(2, 3);
+  protected static List<Integer> formatVersions() {
+    return TestHelpers.V2_AND_ABOVE;
   }
 
   @TestTemplate
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
index 6ba65e6b7f..8e4ffed5b8 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
@@ -25,9 +25,9 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
@@ -83,11 +83,9 @@ public class TestDeleteFiles extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, "main"},
-        new Object[] {1, "testBranch"},
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"testBranch"}))
+        .collect(Collectors.toList());
   }
 
   @TestTemplate
diff --git a/core/src/test/java/org/apache/iceberg/TestFormatVersions.java 
b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
index 3414f1858e..8797705ce4 100644
--- a/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestFormatVersions.java
@@ -21,14 +21,17 @@ package org.apache.iceberg;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.junit.jupiter.api.TestTemplate;
 
 public class TestFormatVersions extends TestBase {
   @Parameters(name = "formatVersion = {0}")
   protected static List<Object> parameters() {
-    return Arrays.asList(1, 2);
+    return TestHelpers.ALL_VERSIONS.stream()
+        // skip the latest supported format version
+        .filter(version -> version < TestHelpers.MAX_FORMAT_VERSION)
+        .collect(Collectors.toList());
   }
 
   @TestTemplate
diff --git a/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java 
b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
index 8a4f5db256..bfbe187738 100644
--- a/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
@@ -18,7 +18,8 @@
  */
 package org.apache.iceberg;
 
-import java.util.Arrays;
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
+
 import java.util.List;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -27,8 +28,8 @@ public class TestLocalFilterFiles
     extends FilterFilesTestBase<TableScan, FileScanTask, CombinedScanTask> {
 
   @Parameters(name = "formatVersion = {0}")
-  public static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java 
b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index b6836822df..b4dab67cde 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -22,16 +22,17 @@ import static 
org.apache.iceberg.relocated.com.google.common.collect.Iterators.c
 import static org.apache.iceberg.util.SnapshotUtil.latestSnapshot;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.exceptions.CommitFailedException;
@@ -50,11 +51,9 @@ public class TestMergeAppend extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, "main"},
-        new Object[] {1, "testBranch"},
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"testBranch"}))
+        .collect(Collectors.toList());
   }
 
   @TestTemplate
@@ -412,6 +411,7 @@ public class TestMergeAppend extends TestBase {
 
   @TestTemplate
   public void testManifestMergeMinCount() throws IOException {
+    assumeThat(formatVersion).isLessThan(3);
     assertThat(listManifestFiles()).isEmpty();
     table
         .updateProperties()
diff --git 
a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java 
b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java
index 0762d3b2dc..4b7e87368a 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java
@@ -21,12 +21,13 @@ package org.apache.iceberg;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
@@ -49,29 +50,26 @@ public class TestMetadataTableFilters extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, table_type = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, MetadataTableType.DATA_FILES},
-        new Object[] {2, MetadataTableType.DATA_FILES},
-        new Object[] {3, MetadataTableType.DATA_FILES},
-        new Object[] {2, MetadataTableType.DELETE_FILES},
-        new Object[] {3, MetadataTableType.DELETE_FILES},
-        new Object[] {1, MetadataTableType.FILES},
-        new Object[] {2, MetadataTableType.FILES},
-        new Object[] {3, MetadataTableType.FILES},
-        new Object[] {1, MetadataTableType.ALL_DATA_FILES},
-        new Object[] {2, MetadataTableType.ALL_DATA_FILES},
-        new Object[] {3, MetadataTableType.ALL_DATA_FILES},
-        new Object[] {2, MetadataTableType.ALL_DELETE_FILES},
-        new Object[] {3, MetadataTableType.ALL_DELETE_FILES},
-        new Object[] {1, MetadataTableType.ALL_FILES},
-        new Object[] {2, MetadataTableType.ALL_FILES},
-        new Object[] {3, MetadataTableType.ALL_FILES},
-        new Object[] {1, MetadataTableType.ENTRIES},
-        new Object[] {2, MetadataTableType.ENTRIES},
-        new Object[] {3, MetadataTableType.ENTRIES},
-        new Object[] {1, MetadataTableType.ALL_ENTRIES},
-        new Object[] {2, MetadataTableType.ALL_ENTRIES},
-        new Object[] {3, MetadataTableType.ALL_ENTRIES});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(
+            v -> {
+              ImmutableList.Builder<Object[]> builder =
+                  ImmutableList.<Object[]>builder()
+                      .add(new Object[] {v, MetadataTableType.DATA_FILES})
+                      .add(new Object[] {v, MetadataTableType.FILES})
+                      .add(new Object[] {v, MetadataTableType.ALL_DATA_FILES})
+                      .add(new Object[] {v, MetadataTableType.ALL_FILES})
+                      .add(new Object[] {v, MetadataTableType.ENTRIES})
+                      .add(new Object[] {v, MetadataTableType.ALL_ENTRIES});
+              if (v >= 2) {
+                builder
+                    .add(new Object[] {v, MetadataTableType.DELETE_FILES})
+                    .add(new Object[] {v, MetadataTableType.ALL_DELETE_FILES});
+              }
+
+              return builder.build().stream();
+            })
+        .collect(Collectors.toList());
   }
 
   @BeforeEach
diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java 
b/core/src/test/java/org/apache/iceberg/TestMetrics.java
index 4da8c480ea..18372ff470 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetrics.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.types.Conversions.fromByteBuffer;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
@@ -31,7 +32,6 @@ import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.data.GenericRecord;
@@ -67,8 +67,8 @@ import org.junit.jupiter.api.io.TempDir;
 public abstract class TestMetrics {
 
   @Parameters(name = "formatVersion = {0}")
-  public static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @TempDir protected Path temp;
diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java 
b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
index f766bb69a1..8f9670d9fc 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -25,7 +26,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.MetricsModes.Counts;
@@ -46,8 +46,8 @@ public class TestMetricsModes {
   @Parameter private int formatVersion;
 
   @Parameters(name = "formatVersion = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @TempDir private Path temp;
diff --git a/core/src/test/java/org/apache/iceberg/TestOverwrite.java 
b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
index a490350159..c853cf69ea 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwrite.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwrite.java
@@ -30,8 +30,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -109,11 +110,9 @@ public class TestOverwrite extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, "main"},
-        new Object[] {1, "testBranch"},
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"testBranch"}))
+        .collect(Collectors.toList());
   }
 
   private static ByteBuffer longToBuffer(long value) {
diff --git 
a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java 
b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
index 3ad236e6c2..f984002f46 100644
--- a/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
+++ b/core/src/test/java/org/apache/iceberg/TestOverwriteWithValidation.java
@@ -33,8 +33,9 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -184,11 +185,9 @@ public class TestOverwriteWithValidation extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {1, "main"},
-        new Object[] {1, "testBranch"},
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"});
+    return TestHelpers.ALL_VERSIONS.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"testBranch"}))
+        .collect(Collectors.toList());
   }
 
   private static ByteBuffer longToBuffer(long value) {
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java 
b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java
index a3f3c18ee6..b5f177fee2 100644
--- a/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java
+++ b/core/src/test/java/org/apache/iceberg/TestPartitionSpecInfo.java
@@ -18,13 +18,13 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.entry;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.List;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.AfterEach;
@@ -42,8 +42,8 @@ public class TestPartitionSpecInfo {
           required(1, "id", Types.IntegerType.get()), required(2, "data", 
Types.StringType.get()));
 
   @Parameters(name = "formatVersion = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @Parameter private int formatVersion;
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
index 25018be23d..96427576b4 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java
@@ -48,7 +48,7 @@ public class TestRewriteFiles extends TestBase {
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
     return TestHelpers.ALL_VERSIONS.stream()
-        .flatMap(i -> Stream.of(new Object[] {i, "main"}, new Object[] {i, 
"branch"}))
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"branch"}))
         .collect(Collectors.toList());
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestRowDelta.java 
b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
index cee1ed6d4b..744281e8d8 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowDelta.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowDelta.java
@@ -32,10 +32,11 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
@@ -57,11 +58,9 @@ public class TestRowDelta extends TestBase {
 
   @Parameters(name = "formatVersion = {0}, branch = {1}")
   protected static List<Object> parameters() {
-    return Arrays.asList(
-        new Object[] {2, "main"},
-        new Object[] {2, "testBranch"},
-        new Object[] {3, "main"},
-        new Object[] {3, "testBranch"});
+    return TestHelpers.V2_AND_ABOVE.stream()
+        .flatMap(v -> Stream.of(new Object[] {v, "main"}, new Object[] {v, 
"testBranch"}))
+        .collect(Collectors.toList());
   }
 
   @TestTemplate
diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java 
b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
index e5b6c5b6ef..4fd4f9d361 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
@@ -20,7 +20,6 @@ package org.apache.iceberg;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.util.List;
@@ -37,7 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
 public class TestRowLineageMetadata {
   @Parameters(name = "formatVersion = {0}")
   private static List<Integer> formatVersion() {
-    return TestHelpers.ALL_VERSIONS;
+    return TestHelpers.V3_AND_ABOVE;
   }
 
   @Parameter private int formatVersion;
@@ -111,8 +110,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testSnapshotAddition() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     long newRows = 30L;
 
     TableMetadata base = baseMetadata();
@@ -147,8 +144,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testInvalidSnapshotAddition() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     Long newRows = 30L;
 
     TableMetadata base = baseMetadata();
@@ -178,8 +173,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testFastAppend() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -199,8 +192,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testAppend() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -220,7 +211,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testAppendBranch() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
     // Appends to a branch should still change last-row-id even if not on 
main, these changes
     // should also affect commits to main
 
@@ -253,8 +243,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testDeletes() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -279,8 +267,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testPositionDeletes() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -317,8 +303,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testEqualityDeletes() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -352,8 +336,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testReplace() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
@@ -382,8 +364,6 @@ public class TestRowLineageMetadata {
 
   @TestTemplate
   public void testMetadataRewrite() {
-    
assumeThat(formatVersion).isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE);
-
     TestTables.TestTable table =
         TestTables.create(
             tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), 
formatVersion);
diff --git 
a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java 
b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
index 3fc691ce94..6318b39bf9 100644
--- a/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
+++ b/core/src/test/java/org/apache/iceberg/TestScansAndSchemaEvolution.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -25,7 +26,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import org.apache.avro.generic.GenericData;
@@ -54,8 +54,8 @@ public class TestScansAndSchemaEvolution {
       PartitionSpec.builderFor(SCHEMA).identity("part").build();
 
   @Parameters(name = "formatVersion = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @Parameter private int formatVersion;
diff --git a/core/src/test/java/org/apache/iceberg/TestSortOrder.java 
b/core/src/test/java/org/apache/iceberg/TestSortOrder.java
index 521320ea23..b130f75a3c 100644
--- a/core/src/test/java/org/apache/iceberg/TestSortOrder.java
+++ b/core/src/test/java/org/apache/iceberg/TestSortOrder.java
@@ -20,6 +20,7 @@ package org.apache.iceberg;
 
 import static org.apache.iceberg.NullOrder.NULLS_FIRST;
 import static org.apache.iceberg.NullOrder.NULLS_LAST;
+import static org.apache.iceberg.TestHelpers.ALL_VERSIONS;
 import static org.apache.iceberg.expressions.Expressions.bucket;
 import static org.apache.iceberg.expressions.Expressions.truncate;
 import static org.apache.iceberg.types.Types.NestedField.optional;
@@ -28,7 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -70,8 +70,8 @@ public class TestSortOrder {
   @TempDir private File tableDir;
 
   @Parameters(name = "formatVersion = {0}")
-  protected static List<Object> parameters() {
-    return Arrays.asList(1, 2, 3);
+  protected static List<Integer> formatVersions() {
+    return ALL_VERSIONS;
   }
 
   @Parameter private int formatVersion;
diff --git a/core/src/test/resources/TableMetadataUnsupportedVersion.json 
b/core/src/test/resources/TableMetadataUnsupportedVersion.json
index c40a0c9cd5..58481d1d24 100644
--- a/core/src/test/resources/TableMetadataUnsupportedVersion.json
+++ b/core/src/test/resources/TableMetadataUnsupportedVersion.json
@@ -1,5 +1,5 @@
 {
-  "format-version": 4,
+  "format-version": 42,
   "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
   "location": "s3://bucket/test/location",
   "last-updated-ms": 1602638573874,

Reply via email to