This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f1fa2b6a8 Spark: Backport moving Spark to use the new FormatModel API 
(#15355)
0f1fa2b6a8 is described below

commit 0f1fa2b6a88389a2cfd0734e83a1d9b13becbfab
Author: pvary <[email protected]>
AuthorDate: Wed Feb 18 17:42:09 2026 +0100

    Spark: Backport moving Spark to use the new FormatModel API (#15355)
---
 .../SparkParquetReadersFlatDataBenchmark.java      |  10 +-
 .../SparkParquetReadersNestedDataBenchmark.java    |  10 +-
 .../SparkParquetWritersFlatDataBenchmark.java      |  20 +-
 .../SparkParquetWritersNestedDataBenchmark.java    |  20 +-
 .../spark/actions/RewriteTablePathSparkAction.java |  90 ++++-----
 .../iceberg/spark/data/SparkParquetWriters.java    |  23 ++-
 .../vectorized/VectorizedSparkParquetReaders.java  |  11 +-
 .../iceberg/spark/source/BaseBatchReader.java      | 102 +++-------
 .../apache/iceberg/spark/source/BaseRowReader.java |  73 +------
 .../spark/source/SparkFileWriterFactory.java       | 222 +++++++++++----------
 .../iceberg/spark/source/SparkFormatModels.java    |  89 +++++++++
 .../SparkParquetReadersFlatDataBenchmark.java      |  10 +-
 .../SparkParquetReadersNestedDataBenchmark.java    |  10 +-
 .../SparkParquetWritersFlatDataBenchmark.java      |  20 +-
 .../SparkParquetWritersNestedDataBenchmark.java    |  20 +-
 .../spark/actions/RewriteTablePathSparkAction.java |  90 ++++-----
 .../iceberg/spark/data/SparkParquetWriters.java    |  23 ++-
 .../vectorized/VectorizedSparkParquetReaders.java  |  11 +-
 .../iceberg/spark/source/BaseBatchReader.java      | 102 +++-------
 .../apache/iceberg/spark/source/BaseRowReader.java |  73 +------
 .../spark/source/SparkFileWriterFactory.java       | 222 +++++++++++----------
 .../iceberg/spark/source/SparkFormatModels.java    |  89 +++++++++
 .../SparkParquetReadersFlatDataBenchmark.java      |  10 +-
 .../SparkParquetReadersNestedDataBenchmark.java    |  10 +-
 .../SparkParquetWritersFlatDataBenchmark.java      |  20 +-
 .../SparkParquetWritersNestedDataBenchmark.java    |  20 +-
 .../spark/actions/RewriteTablePathSparkAction.java |  90 ++++-----
 .../iceberg/spark/data/SparkParquetWriters.java    |  23 ++-
 .../vectorized/VectorizedSparkParquetReaders.java  |  11 +-
 .../iceberg/spark/source/BaseBatchReader.java      | 102 +++-------
 .../apache/iceberg/spark/source/BaseRowReader.java |  73 +------
 .../spark/source/SparkFileWriterFactory.java       | 222 +++++++++++----------
 .../iceberg/spark/source/SparkFormatModels.java    |  89 +++++++++
 33 files changed, 1053 insertions(+), 957 deletions(-)

diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
index e65e2fb576..da520a84a6 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -113,9 +115,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -171,9 +173,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
index 3520c9b474..5c2902b195 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -111,9 +113,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -169,9 +171,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
index f104b8b88b..25a08e7597 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersFlatDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersFlatDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
index e375d1c56a..49645ff31f 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersNestedDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersNestedDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index d6a13bcd51..674d238b3a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -52,13 +52,12 @@ import org.apache.iceberg.actions.RewriteTablePath;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.avro.PlannedDataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FileIO;
@@ -719,32 +718,10 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
 
   private static CloseableIterable<Record> positionDeletesReader(
       InputFile inputFile, FileFormat format, PartitionSpec spec) {
-    Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
-    switch (format) {
-      case AVRO:
-        return Avro.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(fileSchema -> 
PlannedDataReader.create(deleteSchema))
-            .build();
-
-      case PARQUET:
-        return Parquet.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(
-                fileSchema -> GenericParquetReaders.buildReader(deleteSchema, 
fileSchema))
-            .build();
-
-      case ORC:
-        return ORC.read(inputFile)
-            .project(deleteSchema)
-            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(deleteSchema, fileSchema))
-            .build();
-
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
-    }
+    return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
+        .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
+        .reuseContainers()
+        .build();
   }
 
   private static PositionDeleteWriter<Record> positionDeletesWriter(
@@ -754,30 +731,37 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       StructLike partition,
       Schema rowSchema)
       throws IOException {
-    switch (format) {
-      case AVRO:
-        return Avro.writeDeletes(outputFile)
-            .createWriterFunc(DataWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case PARQUET:
-        return Parquet.writeDeletes(outputFile)
-            .createWriterFunc(GenericParquetWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case ORC:
-        return ORC.writeDeletes(outputFile)
-            .createWriterFunc(GenericOrcWriter::buildWriter)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    if (rowSchema == null) {
+      return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
+              format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
+          .partition(partition)
+          .spec(spec)
+          .build();
+    } else {
+      return switch (format) {
+        case AVRO ->
+            Avro.writeDeletes(outputFile)
+                .createWriterFunc(DataWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case PARQUET ->
+            Parquet.writeDeletes(outputFile)
+                .createWriterFunc(GenericParquetWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case ORC ->
+            ORC.writeDeletes(outputFile)
+                .createWriterFunc(GenericOrcWriter::buildWriter)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        default -> throw new UnsupportedOperationException("Unsupported file 
format: " + format);
+      };
     }
   }
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index ffda57be2b..6a99912e1e 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -34,6 +35,7 @@ import 
org.apache.iceberg.parquet.ParquetValueWriters.RepeatedKeyValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters.RepeatedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.DecimalUtil;
 import org.apache.iceberg.util.UUIDUtil;
@@ -61,10 +63,27 @@ import org.apache.spark.unsafe.types.UTF8String;
 public class SparkParquetWriters {
   private SparkParquetWriters() {}
 
-  @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema, 
MessageType type) {
+    return buildWriter(null, type, dfSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, StructType dfSchema) {
+    return (ParquetValueWriter<T>)
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
+  }
+
+  public static <T> ParquetValueWriter<T> buildWriter(
+      StructType dfSchema, MessageType type, Schema icebergSchema) {
     return (ParquetValueWriter<T>)
-        ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new 
WriteBuilder(type));
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
   }
 
   private static class WriteBuilder extends 
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 8e25e81a05..55f9fc1768 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +77,7 @@ public class VectorizedSparkParquetReaders {
     return buildReader(expectedSchema, fileSchema, idToConstant, 
ArrowAllocation.rootAllocator());
   }
 
-  public static CometColumnarBatchReader buildCometReader(
+  public static VectorizedReader<ColumnarBatch> buildCometReader(
       Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> 
idToConstant) {
     return (CometColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
@@ -88,6 +90,13 @@ public class VectorizedSparkParquetReaders {
                 readers -> new CometColumnarBatchReader(readers, 
expectedSchema)));
   }
 
+  /** A subclass of ColumnarBatch to identify Comet readers. */
+  public static class CometColumnarBatch extends ColumnarBatch {
+    public CometColumnarBatch(ColumnVector[] columns) {
+      super(columns);
+    }
+  }
+
   // enables unsafe memory access to avoid costly checks to see if index is 
within bounds
   // as long as it is not configured explicitly (see BoundsChecking in Arrow)
   private static void enableUnsafeMemoryAccess() {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index ff30f29aea..89c03a4c2b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -19,7 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
@@ -29,21 +28,18 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.OrcBatchReadConf;
 import org.apache.iceberg.spark.ParquetBatchReadConf;
 import org.apache.iceberg.spark.ParquetReaderType;
 import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
 import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
 import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
@@ -76,79 +72,37 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       Expression residual,
       Map<Integer, ?> idToConstant,
       @Nonnull SparkDeleteFilter deleteFilter) {
-    CloseableIterable<ColumnarBatch> iterable;
-    switch (format) {
-      case PARQUET:
-        iterable =
-            newParquetIterable(
-                inputFile, start, length, residual, idToConstant, 
deleteFilter.requiredSchema());
-        break;
-      case ORC:
-        iterable = newOrcIterable(inputFile, start, length, residual, 
idToConstant);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "Format: " + format + " not supported for batched reads");
+    Class<? extends ColumnarBatch> readType =
+        useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : 
ColumnarBatch.class;
+    ReadBuilder<ColumnarBatch, ?> readBuilder =
+        FormatModelRegistry.readBuilder(format, readType, inputFile);
+
+    if (parquetConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
+    } else if (orcConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
     }
 
-    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
-  }
+    CloseableIterable<ColumnarBatch> iterable =
+        readBuilder
+            .project(deleteFilter.requiredSchema())
+            .idToConstant(idToConstant)
+            .split(start, length)
+            .filter(residual)
+            .caseSensitive(caseSensitive())
+            // Spark eagerly consumes the batches. So the underlying memory 
allocated could be
+            // reused without worrying about subsequent reads clobbering over 
each other. This
+            // improves read performance as every batch read doesn't have to 
pay the cost of
+            // allocating memory.
+            .reuseContainers()
+            .withNameMapping(nameMapping())
+            .build();
 
-  private CloseableIterable<ColumnarBatch> newParquetIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant,
-      Schema requiredSchema) {
-    return Parquet.read(inputFile)
-        .project(requiredSchema)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema -> {
-              if (parquetConf.readerType() == ParquetReaderType.COMET) {
-                return VectorizedSparkParquetReaders.buildCometReader(
-                    requiredSchema, fileSchema, idToConstant);
-              } else {
-                return VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema, fileSchema, idToConstant);
-              }
-            })
-        .recordsPerBatch(parquetConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
-        // without worrying about subsequent reads clobbering over each other. 
This improves
-        // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
-        .reuseContainers()
-        .withNameMapping(nameMapping())
-        .build();
+    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
   }
 
-  private CloseableIterable<ColumnarBatch> newOrcIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant) {
-    Set<Integer> constantFieldIds = idToConstant.keySet();
-    Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
-    Sets.SetView<Integer> constantAndMetadataFieldIds =
-        Sets.union(constantFieldIds, metadataFieldIds);
-    Schema schemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
-
-    return ORC.read(inputFile)
-        .project(schemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema ->
-                VectorizedSparkOrcReaders.buildReader(expectedSchema(), 
fileSchema, idToConstant))
-        .recordsPerBatch(orcConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
+  private boolean useComet() {
+    return parquetConf != null && parquetConf.readerType() == 
ParquetReaderType.COMET;
   }
 
   @VisibleForTesting
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index c12931e786..53d44e760a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -20,22 +20,15 @@ package org.apache.iceberg.spark.source;
 
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow, T> {
@@ -58,69 +51,15 @@ abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow,
       Expression residual,
       Schema projection,
       Map<Integer, ?> idToConstant) {
-    switch (format) {
-      case PARQUET:
-        return newParquetIterable(file, start, length, residual, projection, 
idToConstant);
-
-      case AVRO:
-        return newAvroIterable(file, start, length, projection, idToConstant);
-
-      case ORC:
-        return newOrcIterable(file, start, length, residual, projection, 
idToConstant);
-
-      default:
-        throw new UnsupportedOperationException("Cannot read unknown format: " 
+ format);
-    }
-  }
-
-  private CloseableIterable<InternalRow> newAvroIterable(
-      InputFile file, long start, long length, Schema projection, Map<Integer, 
?> idToConstant) {
-    return Avro.read(file)
-        .reuseContainers()
+    ReadBuilder<InternalRow, ?> reader =
+        FormatModelRegistry.readBuilder(format, InternalRow.class, file);
+    return reader
         .project(projection)
-        .split(start, length)
-        .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, 
idToConstant))
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newParquetIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    return Parquet.read(file)
+        .idToConstant(idToConstant)
         .reuseContainers()
         .split(start, length)
-        .project(readSchema)
-        .createReaderFunc(
-            fileSchema -> SparkParquetReaders.buildReader(readSchema, 
fileSchema, idToConstant))
-        .filter(residual)
         .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newOrcIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Schema readSchemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(
-            readSchema, Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-
-    return ORC.read(file)
-        .project(readSchemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createReaderFunc(
-            readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, 
idToConstant))
         .filter(residual)
-        .caseSensitive(caseSensitive())
         .withNameMapping(nameMapping())
         .build();
   }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index a93db17e4a..2b3bf73d56 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -23,13 +23,20 @@ import static 
org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
@@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
 import org.apache.iceberg.spark.data.SparkOrcWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-
-class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
-  private StructType dataSparkType;
-  private StructType equalityDeleteSparkType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkFileWriterFactory extends 
RegistryBasedFileWriterFactory<InternalRow, StructType> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkFileWriterFactory.class);
+  // We need to use old writers to write position deletes with row data, which 
is a deprecated
+  // feature.
+  private final boolean useDeprecatedPositionDeleteWriter;
   private StructType positionDeleteSparkType;
+  private final Schema positionDeleteRowSchema;
+  private final Table table;
+  private final FileFormat format;
   private final Map<String, String> writeProperties;
 
   /**
@@ -75,18 +88,26 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema);
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = positionDeleteRowSchema;
+    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.useDeprecatedPositionDeleteWriter =
+        positionDeleteRowSchema != null
+            || (positionDeleteSparkType != null
+                && 
positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined());
   }
 
   SparkFileWriterFactory(
@@ -105,119 +126,110 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        ImmutableMap.of());
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = null;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = null;
+    this.useDeprecatedPositionDeleteWriter = false;
   }
 
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
 
-  @Override
-  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new 
SparkAvroWriter(equalityDeleteSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    boolean withRow =
-        
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
-    if (withRow) {
-      // SparkAvroWriter accepts just the Spark type of the row ignoring the 
path and pos
-      StructField rowField = 
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
-      StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
-      builder.createWriterFunc(ignored -> new 
SparkAvroWriter(positionDeleteRowSparkType));
-    }
-
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
-    builder.createWriterFunc(msgType -> 
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), 
msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), 
msgType));
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  private StructType dataSparkType() {
-    if (dataSparkType == null) {
-      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
-      this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
-    }
-
-    return dataSparkType;
-  }
-
-  private StructType equalityDeleteSparkType() {
-    if (equalityDeleteSparkType == null) {
-      Preconditions.checkNotNull(
-          equalityDeleteRowSchema(), "Equality delete schema must not be 
null");
-      this.equalityDeleteSparkType = 
SparkSchemaUtil.convert(equalityDeleteRowSchema());
-    }
-
-    return equalityDeleteSparkType;
-  }
-
   private StructType positionDeleteSparkType() {
     if (positionDeleteSparkType == null) {
       // wrap the optional row schema into the position delete schema 
containing path and position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
       this.positionDeleteSparkType = 
SparkSchemaUtil.convert(positionDeleteSchema);
     }
 
     return positionDeleteSparkType;
   }
 
+  @Override
+  public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+      EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+    if (!useDeprecatedPositionDeleteWriter) {
+      return super.newPositionDeleteWriter(file, spec, partition);
+    } else {
+      LOG.warn("Position deletes with deleted rows are deprecated and will be 
removed in 1.12.0.");
+      Map<String, String> properties = table == null ? ImmutableMap.of() : 
table.properties();
+      MetricsConfig metricsConfig =
+          table == null
+              ? MetricsConfig.forPositionDelete()
+              : MetricsConfig.forPositionDelete(table);
+
+      try {
+        return switch (format) {
+          case AVRO ->
+              Avro.writeDeletes(file)
+                  .createWriterFunc(
+                      ignored ->
+                          new SparkAvroWriter(
+                              (StructType)
+                                  positionDeleteSparkType()
+                                      .apply(DELETE_FILE_ROW_FIELD_NAME)
+                                      .dataType()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case ORC ->
+              ORC.writeDeletes(file)
+                  .createWriterFunc(SparkOrcWriter::new)
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case PARQUET ->
+              Parquet.writeDeletes(file)
+                  .createWriterFunc(
+                      msgType ->
+                          
SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType))
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .metricsConfig(metricsConfig)
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          default ->
+              throw new UnsupportedOperationException(
+                  "Cannot write pos-deletes for unsupported file format: " + 
format);
+        };
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to create new position delete 
writer", e);
+      }
+    }
+  }
+
   static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -340,4 +352,14 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
           writeProperties);
     }
   }
+
+  private static StructType useOrConvert(StructType sparkType, Schema schema) {
+    if (sparkType != null) {
+      return sparkType;
+    } else if (schema != null) {
+      return SparkSchemaUtil.convert(schema);
+    } else {
+      return null;
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
new file mode 100644
index 0000000000..677f2e950b
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class SparkFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            SparkParquetWriters::buildWriter,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            VectorizedSparkParquetReaders.CometColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildCometReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkOrcWriter(icebergSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                new SparkOrcReader(icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkOrcReaders.buildReader(icebergSchema, 
fileSchema, idToConstant)));
+  }
+
+  private SparkFormatModels() {}
+}
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
index f2f9488b6e..1c7505eab4 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -113,9 +115,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -171,9 +173,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
index b19ab683d6..42454c10ac 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -111,9 +113,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -169,9 +171,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
index 87289b3cba..f77454c4b2 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersFlatDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersFlatDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
index dd913a2756..a732d36526 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersNestedDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersNestedDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index d6a13bcd51..674d238b3a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -52,13 +52,12 @@ import org.apache.iceberg.actions.RewriteTablePath;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.avro.PlannedDataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FileIO;
@@ -719,32 +718,10 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
 
   private static CloseableIterable<Record> positionDeletesReader(
       InputFile inputFile, FileFormat format, PartitionSpec spec) {
-    Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
-    switch (format) {
-      case AVRO:
-        return Avro.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(fileSchema -> 
PlannedDataReader.create(deleteSchema))
-            .build();
-
-      case PARQUET:
-        return Parquet.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(
-                fileSchema -> GenericParquetReaders.buildReader(deleteSchema, 
fileSchema))
-            .build();
-
-      case ORC:
-        return ORC.read(inputFile)
-            .project(deleteSchema)
-            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(deleteSchema, fileSchema))
-            .build();
-
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
-    }
+    return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
+        .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
+        .reuseContainers()
+        .build();
   }
 
   private static PositionDeleteWriter<Record> positionDeletesWriter(
@@ -754,30 +731,37 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       StructLike partition,
       Schema rowSchema)
       throws IOException {
-    switch (format) {
-      case AVRO:
-        return Avro.writeDeletes(outputFile)
-            .createWriterFunc(DataWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case PARQUET:
-        return Parquet.writeDeletes(outputFile)
-            .createWriterFunc(GenericParquetWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case ORC:
-        return ORC.writeDeletes(outputFile)
-            .createWriterFunc(GenericOrcWriter::buildWriter)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    if (rowSchema == null) {
+      return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
+              format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
+          .partition(partition)
+          .spec(spec)
+          .build();
+    } else {
+      return switch (format) {
+        case AVRO ->
+            Avro.writeDeletes(outputFile)
+                .createWriterFunc(DataWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case PARQUET ->
+            Parquet.writeDeletes(outputFile)
+                .createWriterFunc(GenericParquetWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case ORC ->
+            ORC.writeDeletes(outputFile)
+                .createWriterFunc(GenericOrcWriter::buildWriter)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        default -> throw new UnsupportedOperationException("Unsupported file 
format: " + format);
+      };
     }
   }
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 8ffe26dc33..dda634a46f 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -34,6 +35,7 @@ import 
org.apache.iceberg.parquet.ParquetValueWriters.RepeatedKeyValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters.RepeatedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.DecimalUtil;
 import org.apache.iceberg.util.UUIDUtil;
@@ -61,10 +63,27 @@ import org.apache.spark.unsafe.types.UTF8String;
 public class SparkParquetWriters {
   private SparkParquetWriters() {}
 
-  @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema, 
MessageType type) {
+    return buildWriter(null, type, dfSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, StructType dfSchema) {
+    return (ParquetValueWriter<T>)
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
+  }
+
+  public static <T> ParquetValueWriter<T> buildWriter(
+      StructType dfSchema, MessageType type, Schema icebergSchema) {
     return (ParquetValueWriter<T>)
-        ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new 
WriteBuilder(type));
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
   }
 
   private static class WriteBuilder extends 
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 8e25e81a05..55f9fc1768 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +77,7 @@ public class VectorizedSparkParquetReaders {
     return buildReader(expectedSchema, fileSchema, idToConstant, 
ArrowAllocation.rootAllocator());
   }
 
-  public static CometColumnarBatchReader buildCometReader(
+  public static VectorizedReader<ColumnarBatch> buildCometReader(
       Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> 
idToConstant) {
     return (CometColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
@@ -88,6 +90,13 @@ public class VectorizedSparkParquetReaders {
                 readers -> new CometColumnarBatchReader(readers, 
expectedSchema)));
   }
 
+  /** A subclass of ColumnarBatch to identify Comet readers. */
+  public static class CometColumnarBatch extends ColumnarBatch {
+    public CometColumnarBatch(ColumnVector[] columns) {
+      super(columns);
+    }
+  }
+
   // enables unsafe memory access to avoid costly checks to see if index is 
within bounds
   // as long as it is not configured explicitly (see BoundsChecking in Arrow)
   private static void enableUnsafeMemoryAccess() {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index ff30f29aea..89c03a4c2b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -19,7 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
@@ -29,21 +28,18 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.OrcBatchReadConf;
 import org.apache.iceberg.spark.ParquetBatchReadConf;
 import org.apache.iceberg.spark.ParquetReaderType;
 import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
 import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
 import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
@@ -76,79 +72,37 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       Expression residual,
       Map<Integer, ?> idToConstant,
       @Nonnull SparkDeleteFilter deleteFilter) {
-    CloseableIterable<ColumnarBatch> iterable;
-    switch (format) {
-      case PARQUET:
-        iterable =
-            newParquetIterable(
-                inputFile, start, length, residual, idToConstant, 
deleteFilter.requiredSchema());
-        break;
-      case ORC:
-        iterable = newOrcIterable(inputFile, start, length, residual, 
idToConstant);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "Format: " + format + " not supported for batched reads");
+    Class<? extends ColumnarBatch> readType =
+        useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : 
ColumnarBatch.class;
+    ReadBuilder<ColumnarBatch, ?> readBuilder =
+        FormatModelRegistry.readBuilder(format, readType, inputFile);
+
+    if (parquetConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
+    } else if (orcConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
     }
 
-    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
-  }
+    CloseableIterable<ColumnarBatch> iterable =
+        readBuilder
+            .project(deleteFilter.requiredSchema())
+            .idToConstant(idToConstant)
+            .split(start, length)
+            .filter(residual)
+            .caseSensitive(caseSensitive())
+            // Spark eagerly consumes the batches. So the underlying memory 
allocated could be
+            // reused without worrying about subsequent reads clobbering over 
each other. This
+            // improves read performance as every batch read doesn't have to 
pay the cost of
+            // allocating memory.
+            .reuseContainers()
+            .withNameMapping(nameMapping())
+            .build();
 
-  private CloseableIterable<ColumnarBatch> newParquetIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant,
-      Schema requiredSchema) {
-    return Parquet.read(inputFile)
-        .project(requiredSchema)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema -> {
-              if (parquetConf.readerType() == ParquetReaderType.COMET) {
-                return VectorizedSparkParquetReaders.buildCometReader(
-                    requiredSchema, fileSchema, idToConstant);
-              } else {
-                return VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema, fileSchema, idToConstant);
-              }
-            })
-        .recordsPerBatch(parquetConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
-        // without worrying about subsequent reads clobbering over each other. 
This improves
-        // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
-        .reuseContainers()
-        .withNameMapping(nameMapping())
-        .build();
+    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
   }
 
-  private CloseableIterable<ColumnarBatch> newOrcIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant) {
-    Set<Integer> constantFieldIds = idToConstant.keySet();
-    Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
-    Sets.SetView<Integer> constantAndMetadataFieldIds =
-        Sets.union(constantFieldIds, metadataFieldIds);
-    Schema schemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
-
-    return ORC.read(inputFile)
-        .project(schemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema ->
-                VectorizedSparkOrcReaders.buildReader(expectedSchema(), 
fileSchema, idToConstant))
-        .recordsPerBatch(orcConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
+  private boolean useComet() {
+    return parquetConf != null && parquetConf.readerType() == 
ParquetReaderType.COMET;
   }
 
   @VisibleForTesting
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index c12931e786..53d44e760a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -20,22 +20,15 @@ package org.apache.iceberg.spark.source;
 
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow, T> {
@@ -58,69 +51,15 @@ abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow,
       Expression residual,
       Schema projection,
       Map<Integer, ?> idToConstant) {
-    switch (format) {
-      case PARQUET:
-        return newParquetIterable(file, start, length, residual, projection, 
idToConstant);
-
-      case AVRO:
-        return newAvroIterable(file, start, length, projection, idToConstant);
-
-      case ORC:
-        return newOrcIterable(file, start, length, residual, projection, 
idToConstant);
-
-      default:
-        throw new UnsupportedOperationException("Cannot read unknown format: " 
+ format);
-    }
-  }
-
-  private CloseableIterable<InternalRow> newAvroIterable(
-      InputFile file, long start, long length, Schema projection, Map<Integer, 
?> idToConstant) {
-    return Avro.read(file)
-        .reuseContainers()
+    ReadBuilder<InternalRow, ?> reader =
+        FormatModelRegistry.readBuilder(format, InternalRow.class, file);
+    return reader
         .project(projection)
-        .split(start, length)
-        .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, 
idToConstant))
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newParquetIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    return Parquet.read(file)
+        .idToConstant(idToConstant)
         .reuseContainers()
         .split(start, length)
-        .project(readSchema)
-        .createReaderFunc(
-            fileSchema -> SparkParquetReaders.buildReader(readSchema, 
fileSchema, idToConstant))
-        .filter(residual)
         .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newOrcIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Schema readSchemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(
-            readSchema, Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-
-    return ORC.read(file)
-        .project(readSchemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createReaderFunc(
-            readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, 
idToConstant))
         .filter(residual)
-        .caseSensitive(caseSensitive())
         .withNameMapping(nameMapping())
         .build();
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index a93db17e4a..2b3bf73d56 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -23,13 +23,20 @@ import static 
org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
@@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
 import org.apache.iceberg.spark.data.SparkOrcWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-
-class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
-  private StructType dataSparkType;
-  private StructType equalityDeleteSparkType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkFileWriterFactory extends 
RegistryBasedFileWriterFactory<InternalRow, StructType> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkFileWriterFactory.class);
+  // We need to use old writers to write position deletes with row data, which 
is a deprecated
+  // feature.
+  private final boolean useDeprecatedPositionDeleteWriter;
   private StructType positionDeleteSparkType;
+  private final Schema positionDeleteRowSchema;
+  private final Table table;
+  private final FileFormat format;
   private final Map<String, String> writeProperties;
 
   /**
@@ -75,18 +88,26 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema);
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = positionDeleteRowSchema;
+    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.useDeprecatedPositionDeleteWriter =
+        positionDeleteRowSchema != null
+            || (positionDeleteSparkType != null
+                && 
positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined());
   }
 
   SparkFileWriterFactory(
@@ -105,119 +126,110 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        ImmutableMap.of());
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = null;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = null;
+    this.useDeprecatedPositionDeleteWriter = false;
   }
 
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
 
-  @Override
-  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new 
SparkAvroWriter(equalityDeleteSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    boolean withRow =
-        
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
-    if (withRow) {
-      // SparkAvroWriter accepts just the Spark type of the row ignoring the 
path and pos
-      StructField rowField = 
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
-      StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
-      builder.createWriterFunc(ignored -> new 
SparkAvroWriter(positionDeleteRowSparkType));
-    }
-
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
-    builder.createWriterFunc(msgType -> 
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), 
msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), 
msgType));
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  private StructType dataSparkType() {
-    if (dataSparkType == null) {
-      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
-      this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
-    }
-
-    return dataSparkType;
-  }
-
-  private StructType equalityDeleteSparkType() {
-    if (equalityDeleteSparkType == null) {
-      Preconditions.checkNotNull(
-          equalityDeleteRowSchema(), "Equality delete schema must not be 
null");
-      this.equalityDeleteSparkType = 
SparkSchemaUtil.convert(equalityDeleteRowSchema());
-    }
-
-    return equalityDeleteSparkType;
-  }
-
   private StructType positionDeleteSparkType() {
     if (positionDeleteSparkType == null) {
       // wrap the optional row schema into the position delete schema 
containing path and position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
       this.positionDeleteSparkType = 
SparkSchemaUtil.convert(positionDeleteSchema);
     }
 
     return positionDeleteSparkType;
   }
 
+  @Override
+  public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+      EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+    if (!useDeprecatedPositionDeleteWriter) {
+      return super.newPositionDeleteWriter(file, spec, partition);
+    } else {
+      LOG.warn("Position deletes with deleted rows are deprecated and will be 
removed in 1.12.0.");
+      Map<String, String> properties = table == null ? ImmutableMap.of() : 
table.properties();
+      MetricsConfig metricsConfig =
+          table == null
+              ? MetricsConfig.forPositionDelete()
+              : MetricsConfig.forPositionDelete(table);
+
+      try {
+        return switch (format) {
+          case AVRO ->
+              Avro.writeDeletes(file)
+                  .createWriterFunc(
+                      ignored ->
+                          new SparkAvroWriter(
+                              (StructType)
+                                  positionDeleteSparkType()
+                                      .apply(DELETE_FILE_ROW_FIELD_NAME)
+                                      .dataType()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case ORC ->
+              ORC.writeDeletes(file)
+                  .createWriterFunc(SparkOrcWriter::new)
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case PARQUET ->
+              Parquet.writeDeletes(file)
+                  .createWriterFunc(
+                      msgType ->
+                          
SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType))
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .metricsConfig(metricsConfig)
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          default ->
+              throw new UnsupportedOperationException(
+                  "Cannot write pos-deletes for unsupported file format: " + 
format);
+        };
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to create new position delete 
writer", e);
+      }
+    }
+  }
+
   static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -340,4 +352,14 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
           writeProperties);
     }
   }
+
+  private static StructType useOrConvert(StructType sparkType, Schema schema) {
+    if (sparkType != null) {
+      return sparkType;
+    } else if (schema != null) {
+      return SparkSchemaUtil.convert(schema);
+    } else {
+      return null;
+    }
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
new file mode 100644
index 0000000000..677f2e950b
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class SparkFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            SparkParquetWriters::buildWriter,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            VectorizedSparkParquetReaders.CometColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildCometReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkOrcWriter(icebergSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                new SparkOrcReader(icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkOrcReaders.buildReader(icebergSchema, 
fileSchema, idToConstant)));
+  }
+
+  private SparkFormatModels() {}
+}
diff --git 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
index 3dbee5dfd0..9b7bbe0eb4 100644
--- 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
+++ 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -113,9 +115,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -171,9 +173,9 @@ public class SparkParquetReadersFlatDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
index 8487988d9e..e77191d5e5 100644
--- 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
+++ 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -111,9 +113,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(SCHEMA)
-            .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA, 
type))
             .build()) {
 
       for (InternalRow row : rows) {
@@ -169,9 +171,9 @@ public class SparkParquetReadersNestedDataBenchmark {
   @Threads(1)
   public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws 
IOException {
     try (CloseableIterable<InternalRow> rows =
-        Parquet.read(Files.localInput(dataFile))
+        FormatModelRegistry.readBuilder(
+                FileFormat.PARQUET, InternalRow.class, 
Files.localInput(dataFile))
             .project(PROJECTED_SCHEMA)
-            .createReaderFunc(type -> 
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
             .build()) {
 
       for (InternalRow row : rows) {
diff --git 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
index 47f0b72088..46fb96b8a3 100644
--- 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
+++ 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersFlatDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersFlatDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
index 4df890d861..6cb5e23d0b 100644
--- 
a/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
+++ 
b/spark/v4.0/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -23,13 +23,17 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersNestedDataBenchmark {
   @Benchmark
   @Threads(1)
   public void writeUsingIcebergWriter() throws IOException {
-    try (FileAppender<InternalRow> writer =
-        Parquet.write(Files.localOutput(dataFile))
-            .createWriterFunc(
-                msgType ->
-                    
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+    try (DataWriter<InternalRow> writer =
+        FormatModelRegistry.dataWriteBuilder(
+                FileFormat.PARQUET,
+                InternalRow.class,
+                
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
             .schema(SCHEMA)
+            .spec(PartitionSpec.unpartitioned())
             .build()) {
 
-      writer.addAll(rows);
+      writer.write(rows);
     }
   }
 
@@ -121,6 +126,7 @@ public class SparkParquetWritersNestedDataBenchmark {
             .set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
             .set("spark.sql.caseSensitive", "false")
             .set("spark.sql.parquet.fieldId.write.enabled", "false")
+            .set("spark.sql.parquet.variant.annotateLogicalType.enabled", 
"false")
             .schema(SCHEMA)
             .build()) {
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index d6a13bcd51..674d238b3a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -52,13 +52,12 @@ import org.apache.iceberg.actions.RewriteTablePath;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.avro.PlannedDataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.formats.FormatModelRegistry;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FileIO;
@@ -719,32 +718,10 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
 
   private static CloseableIterable<Record> positionDeletesReader(
       InputFile inputFile, FileFormat format, PartitionSpec spec) {
-    Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
-    switch (format) {
-      case AVRO:
-        return Avro.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(fileSchema -> 
PlannedDataReader.create(deleteSchema))
-            .build();
-
-      case PARQUET:
-        return Parquet.read(inputFile)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(
-                fileSchema -> GenericParquetReaders.buildReader(deleteSchema, 
fileSchema))
-            .build();
-
-      case ORC:
-        return ORC.read(inputFile)
-            .project(deleteSchema)
-            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(deleteSchema, fileSchema))
-            .build();
-
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
-    }
+    return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
+        .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
+        .reuseContainers()
+        .build();
   }
 
   private static PositionDeleteWriter<Record> positionDeletesWriter(
@@ -754,30 +731,37 @@ public class RewriteTablePathSparkAction extends 
BaseSparkAction<RewriteTablePat
       StructLike partition,
       Schema rowSchema)
       throws IOException {
-    switch (format) {
-      case AVRO:
-        return Avro.writeDeletes(outputFile)
-            .createWriterFunc(DataWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case PARQUET:
-        return Parquet.writeDeletes(outputFile)
-            .createWriterFunc(GenericParquetWriter::create)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      case ORC:
-        return ORC.writeDeletes(outputFile)
-            .createWriterFunc(GenericOrcWriter::buildWriter)
-            .withPartition(partition)
-            .rowSchema(rowSchema)
-            .withSpec(spec)
-            .buildPositionWriter();
-      default:
-        throw new UnsupportedOperationException("Unsupported file format: " + 
format);
+    if (rowSchema == null) {
+      return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
+              format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
+          .partition(partition)
+          .spec(spec)
+          .build();
+    } else {
+      return switch (format) {
+        case AVRO ->
+            Avro.writeDeletes(outputFile)
+                .createWriterFunc(DataWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case PARQUET ->
+            Parquet.writeDeletes(outputFile)
+                .createWriterFunc(GenericParquetWriter::create)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        case ORC ->
+            ORC.writeDeletes(outputFile)
+                .createWriterFunc(GenericOrcWriter::buildWriter)
+                .withPartition(partition)
+                .rowSchema(rowSchema)
+                .withSpec(spec)
+                .buildPositionWriter();
+        default -> throw new UnsupportedOperationException("Unsupported file 
format: " + format);
+      };
     }
   }
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 8bdfe7c3a8..3ff5ef9c57 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.iceberg.FieldMetrics;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
 import org.apache.iceberg.parquet.ParquetValueWriter;
 import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -41,6 +42,7 @@ import org.apache.iceberg.parquet.TripleWriter;
 import org.apache.iceberg.parquet.VariantWriterBuilder;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.DecimalUtil;
 import org.apache.iceberg.util.UUIDUtil;
@@ -75,10 +77,27 @@ import org.apache.spark.unsafe.types.VariantVal;
 public class SparkParquetWriters {
   private SparkParquetWriters() {}
 
-  @SuppressWarnings("unchecked")
   public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema, 
MessageType type) {
+    return buildWriter(null, type, dfSchema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> ParquetValueWriter<T> buildWriter(
+      Schema icebergSchema, MessageType type, StructType dfSchema) {
+    return (ParquetValueWriter<T>)
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
+  }
+
+  public static <T> ParquetValueWriter<T> buildWriter(
+      StructType dfSchema, MessageType type, Schema icebergSchema) {
     return (ParquetValueWriter<T>)
-        ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new 
WriteBuilder(type));
+        ParquetWithSparkSchemaVisitor.visit(
+            dfSchema != null ? dfSchema : 
SparkSchemaUtil.convert(icebergSchema),
+            type,
+            new WriteBuilder(type));
   }
 
   private static class WriteBuilder extends 
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 8e25e81a05..55f9fc1768 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +77,7 @@ public class VectorizedSparkParquetReaders {
     return buildReader(expectedSchema, fileSchema, idToConstant, 
ArrowAllocation.rootAllocator());
   }
 
-  public static CometColumnarBatchReader buildCometReader(
+  public static VectorizedReader<ColumnarBatch> buildCometReader(
       Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> 
idToConstant) {
     return (CometColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
@@ -88,6 +90,13 @@ public class VectorizedSparkParquetReaders {
                 readers -> new CometColumnarBatchReader(readers, 
expectedSchema)));
   }
 
+  /** A subclass of ColumnarBatch to identify Comet readers. */
+  public static class CometColumnarBatch extends ColumnarBatch {
+    public CometColumnarBatch(ColumnVector[] columns) {
+      super(columns);
+    }
+  }
+
   // enables unsafe memory access to avoid costly checks to see if index is 
within bounds
   // as long as it is not configured explicitly (see BoundsChecking in Arrow)
   private static void enableUnsafeMemoryAccess() {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index ff30f29aea..89c03a4c2b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -19,7 +19,6 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.Map;
-import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
@@ -29,21 +28,18 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.OrcBatchReadConf;
 import org.apache.iceberg.spark.ParquetBatchReadConf;
 import org.apache.iceberg.spark.ParquetReaderType;
 import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
 import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
 import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
@@ -76,79 +72,37 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       Expression residual,
       Map<Integer, ?> idToConstant,
       @Nonnull SparkDeleteFilter deleteFilter) {
-    CloseableIterable<ColumnarBatch> iterable;
-    switch (format) {
-      case PARQUET:
-        iterable =
-            newParquetIterable(
-                inputFile, start, length, residual, idToConstant, 
deleteFilter.requiredSchema());
-        break;
-      case ORC:
-        iterable = newOrcIterable(inputFile, start, length, residual, 
idToConstant);
-        break;
-      default:
-        throw new UnsupportedOperationException(
-            "Format: " + format + " not supported for batched reads");
+    Class<? extends ColumnarBatch> readType =
+        useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class : 
ColumnarBatch.class;
+    ReadBuilder<ColumnarBatch, ?> readBuilder =
+        FormatModelRegistry.readBuilder(format, readType, inputFile);
+
+    if (parquetConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
+    } else if (orcConf != null) {
+      readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
     }
 
-    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
-  }
+    CloseableIterable<ColumnarBatch> iterable =
+        readBuilder
+            .project(deleteFilter.requiredSchema())
+            .idToConstant(idToConstant)
+            .split(start, length)
+            .filter(residual)
+            .caseSensitive(caseSensitive())
+            // Spark eagerly consumes the batches. So the underlying memory 
allocated could be
+            // reused without worrying about subsequent reads clobbering over 
each other. This
+            // improves read performance as every batch read doesn't have to 
pay the cost of
+            // allocating memory.
+            .reuseContainers()
+            .withNameMapping(nameMapping())
+            .build();
 
-  private CloseableIterable<ColumnarBatch> newParquetIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant,
-      Schema requiredSchema) {
-    return Parquet.read(inputFile)
-        .project(requiredSchema)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema -> {
-              if (parquetConf.readerType() == ParquetReaderType.COMET) {
-                return VectorizedSparkParquetReaders.buildCometReader(
-                    requiredSchema, fileSchema, idToConstant);
-              } else {
-                return VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema, fileSchema, idToConstant);
-              }
-            })
-        .recordsPerBatch(parquetConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        // Spark eagerly consumes the batches. So the underlying memory 
allocated could be reused
-        // without worrying about subsequent reads clobbering over each other. 
This improves
-        // read performance as every batch read doesn't have to pay the cost 
of allocating memory.
-        .reuseContainers()
-        .withNameMapping(nameMapping())
-        .build();
+    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
   }
 
-  private CloseableIterable<ColumnarBatch> newOrcIterable(
-      InputFile inputFile,
-      long start,
-      long length,
-      Expression residual,
-      Map<Integer, ?> idToConstant) {
-    Set<Integer> constantFieldIds = idToConstant.keySet();
-    Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
-    Sets.SetView<Integer> constantAndMetadataFieldIds =
-        Sets.union(constantFieldIds, metadataFieldIds);
-    Schema schemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
-
-    return ORC.read(inputFile)
-        .project(schemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createBatchedReaderFunc(
-            fileSchema ->
-                VectorizedSparkOrcReaders.buildReader(expectedSchema(), 
fileSchema, idToConstant))
-        .recordsPerBatch(orcConf.batchSize())
-        .filter(residual)
-        .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
+  private boolean useComet() {
+    return parquetConf != null && parquetConf.readerType() == 
ParquetReaderType.COMET;
   }
 
   @VisibleForTesting
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index c12931e786..53d44e760a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -20,22 +20,15 @@ package org.apache.iceberg.spark.source;
 
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 
 abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow, T> {
@@ -58,69 +51,15 @@ abstract class BaseRowReader<T extends ScanTask> extends 
BaseReader<InternalRow,
       Expression residual,
       Schema projection,
       Map<Integer, ?> idToConstant) {
-    switch (format) {
-      case PARQUET:
-        return newParquetIterable(file, start, length, residual, projection, 
idToConstant);
-
-      case AVRO:
-        return newAvroIterable(file, start, length, projection, idToConstant);
-
-      case ORC:
-        return newOrcIterable(file, start, length, residual, projection, 
idToConstant);
-
-      default:
-        throw new UnsupportedOperationException("Cannot read unknown format: " 
+ format);
-    }
-  }
-
-  private CloseableIterable<InternalRow> newAvroIterable(
-      InputFile file, long start, long length, Schema projection, Map<Integer, 
?> idToConstant) {
-    return Avro.read(file)
-        .reuseContainers()
+    ReadBuilder<InternalRow, ?> reader =
+        FormatModelRegistry.readBuilder(format, InternalRow.class, file);
+    return reader
         .project(projection)
-        .split(start, length)
-        .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, 
idToConstant))
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newParquetIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    return Parquet.read(file)
+        .idToConstant(idToConstant)
         .reuseContainers()
         .split(start, length)
-        .project(readSchema)
-        .createReaderFunc(
-            fileSchema -> SparkParquetReaders.buildReader(readSchema, 
fileSchema, idToConstant))
-        .filter(residual)
         .caseSensitive(caseSensitive())
-        .withNameMapping(nameMapping())
-        .build();
-  }
-
-  private CloseableIterable<InternalRow> newOrcIterable(
-      InputFile file,
-      long start,
-      long length,
-      Expression residual,
-      Schema readSchema,
-      Map<Integer, ?> idToConstant) {
-    Schema readSchemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(
-            readSchema, Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-
-    return ORC.read(file)
-        .project(readSchemaWithoutConstantAndMetadataFields)
-        .split(start, length)
-        .createReaderFunc(
-            readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, 
idToConstant))
         .filter(residual)
-        .caseSensitive(caseSensitive())
         .withNameMapping(nameMapping())
         .build();
   }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index a93db17e4a..2b3bf73d56 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -23,13 +23,20 @@ import static 
org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
@@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
 import org.apache.iceberg.spark.data.SparkOrcWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-
-class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
-  private StructType dataSparkType;
-  private StructType equalityDeleteSparkType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkFileWriterFactory extends 
RegistryBasedFileWriterFactory<InternalRow, StructType> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkFileWriterFactory.class);
+  // We need to use old writers to write position deletes with row data, which 
is a deprecated
+  // feature.
+  private final boolean useDeprecatedPositionDeleteWriter;
   private StructType positionDeleteSparkType;
+  private final Schema positionDeleteRowSchema;
+  private final Table table;
+  private final FileFormat format;
   private final Map<String, String> writeProperties;
 
   /**
@@ -75,18 +88,26 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema);
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = positionDeleteRowSchema;
+    this.positionDeleteSparkType = positionDeleteSparkType;
+    this.useDeprecatedPositionDeleteWriter =
+        positionDeleteRowSchema != null
+            || (positionDeleteSparkType != null
+                && 
positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined());
   }
 
   SparkFileWriterFactory(
@@ -105,119 +126,110 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     super(
         table,
         dataFileFormat,
+        InternalRow.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        ImmutableMap.of());
+        writeProperties,
+        useOrConvert(dataSparkType, dataSchema),
+        useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
 
-    this.dataSparkType = dataSparkType;
-    this.equalityDeleteSparkType = equalityDeleteSparkType;
-    this.positionDeleteSparkType = null;
+    this.table = table;
+    this.format = dataFileFormat;
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+    this.positionDeleteRowSchema = null;
+    this.useDeprecatedPositionDeleteWriter = false;
   }
 
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
 
-  @Override
-  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new 
SparkAvroWriter(equalityDeleteSparkType()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    boolean withRow =
-        
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
-    if (withRow) {
-      // SparkAvroWriter accepts just the Spark type of the row ignoring the 
path and pos
-      StructField rowField = 
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
-      StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
-      builder.createWriterFunc(ignored -> new 
SparkAvroWriter(positionDeleteRowSparkType));
-    }
-
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
-    builder.createWriterFunc(msgType -> 
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(), 
msgType));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(), 
msgType));
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.setAll(writeProperties);
-  }
-
-  @Override
-  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(SparkOrcWriter::new);
-    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
-    builder.setAll(writeProperties);
-  }
-
-  private StructType dataSparkType() {
-    if (dataSparkType == null) {
-      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
-      this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
-    }
-
-    return dataSparkType;
-  }
-
-  private StructType equalityDeleteSparkType() {
-    if (equalityDeleteSparkType == null) {
-      Preconditions.checkNotNull(
-          equalityDeleteRowSchema(), "Equality delete schema must not be 
null");
-      this.equalityDeleteSparkType = 
SparkSchemaUtil.convert(equalityDeleteRowSchema());
-    }
-
-    return equalityDeleteSparkType;
-  }
-
   private StructType positionDeleteSparkType() {
     if (positionDeleteSparkType == null) {
       // wrap the optional row schema into the position delete schema 
containing path and position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
       this.positionDeleteSparkType = 
SparkSchemaUtil.convert(positionDeleteSchema);
     }
 
     return positionDeleteSparkType;
   }
 
+  @Override
+  public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+      EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+    if (!useDeprecatedPositionDeleteWriter) {
+      return super.newPositionDeleteWriter(file, spec, partition);
+    } else {
+      LOG.warn("Position deletes with deleted rows are deprecated and will be 
removed in 1.12.0.");
+      Map<String, String> properties = table == null ? ImmutableMap.of() : 
table.properties();
+      MetricsConfig metricsConfig =
+          table == null
+              ? MetricsConfig.forPositionDelete()
+              : MetricsConfig.forPositionDelete(table);
+
+      try {
+        return switch (format) {
+          case AVRO ->
+              Avro.writeDeletes(file)
+                  .createWriterFunc(
+                      ignored ->
+                          new SparkAvroWriter(
+                              (StructType)
+                                  positionDeleteSparkType()
+                                      .apply(DELETE_FILE_ROW_FIELD_NAME)
+                                      .dataType()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case ORC ->
+              ORC.writeDeletes(file)
+                  .createWriterFunc(SparkOrcWriter::new)
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          case PARQUET ->
+              Parquet.writeDeletes(file)
+                  .createWriterFunc(
+                      msgType ->
+                          
SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType))
+                  .transformPaths(path -> 
UTF8String.fromString(path.toString()))
+                  .setAll(properties)
+                  .setAll(writeProperties)
+                  .metricsConfig(metricsConfig)
+                  .withPartition(partition)
+                  .overwrite()
+                  .metricsConfig(metricsConfig)
+                  .rowSchema(positionDeleteRowSchema)
+                  .withSpec(spec)
+                  .withKeyMetadata(file.keyMetadata())
+                  .buildPositionWriter();
+          default ->
+              throw new UnsupportedOperationException(
+                  "Cannot write pos-deletes for unsupported file format: " + 
format);
+        };
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to create new position delete 
writer", e);
+      }
+    }
+  }
+
   static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -340,4 +352,14 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
           writeProperties);
     }
   }
+
+  private static StructType useOrConvert(StructType sparkType, Schema schema) {
+    if (sparkType != null) {
+      return sparkType;
+    } else if (schema != null) {
+      return SparkSchemaUtil.convert(schema);
+    } else {
+      return null;
+    }
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
new file mode 100644
index 0000000000..677f2e950b
--- /dev/null
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class SparkFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) -> new 
SparkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            SparkParquetWriters::buildWriter,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                SparkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            VectorizedSparkParquetReaders.CometColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkParquetReaders.buildCometReader(
+                    icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            InternalRow.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                new SparkOrcWriter(icebergSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                new SparkOrcReader(icebergSchema, fileSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            ColumnarBatch.class,
+            StructType.class,
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                VectorizedSparkOrcReaders.buildReader(icebergSchema, 
fileSchema, idToConstant)));
+  }
+
+  private SparkFormatModels() {}
+}


Reply via email to