This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e34ec24849 Core, Flink, Spark: Further deprecations for the positional 
deletes with row data (#14210)
e34ec24849 is described below

commit e34ec248495cdee6259c35c2f44807b2d910dc4e
Author: pvary <[email protected]>
AuthorDate: Thu Oct 16 07:11:33 2025 +0200

    Core, Flink, Spark: Further deprecations for the positional deletes with 
row data (#14210)
---
 .../apache/iceberg/data/BaseFileWriterFactory.java | 34 ++++++++++++++
 .../iceberg/data/GenericAppenderFactory.java       | 43 +++++++++++++++++
 .../iceberg/data/GenericFileWriterFactory.java     | 43 +++++++++++++++--
 .../iceberg/data/TestGenericFileWriterFactory.java |  9 +++-
 .../org/apache/iceberg/io/TestWriterMetrics.java   | 54 +++++++++-------------
 .../iceberg/flink/sink/FlinkAppenderFactory.java   | 17 +++++++
 .../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
 .../flink/sink/TestFlinkFileWriterFactory.java     |  9 +++-
 .../flink/sink/TestFlinkPartitioningWriters.java   |  1 -
 .../flink/sink/TestFlinkPositionDeltaWriters.java  |  1 -
 .../flink/sink/TestFlinkRollingFileWriters.java    |  1 -
 .../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
 .../flink/source/TestFlinkMergingMetrics.java      |  1 -
 .../iceberg/flink/sink/FlinkAppenderFactory.java   | 17 +++++++
 .../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
 .../flink/sink/TestFlinkFileWriterFactory.java     |  9 +++-
 .../flink/sink/TestFlinkPartitioningWriters.java   |  1 -
 .../flink/sink/TestFlinkPositionDeltaWriters.java  |  1 -
 .../flink/sink/TestFlinkRollingFileWriters.java    |  1 -
 .../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
 .../flink/source/TestFlinkMergingMetrics.java      |  1 -
 .../iceberg/flink/sink/FlinkAppenderFactory.java   | 17 +++++++
 .../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
 .../flink/sink/TestFlinkFileWriterFactory.java     |  9 +++-
 .../flink/sink/TestFlinkPartitioningWriters.java   |  1 -
 .../flink/sink/TestFlinkPositionDeltaWriters.java  |  1 -
 .../flink/sink/TestFlinkRollingFileWriters.java    |  1 -
 .../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
 .../flink/source/TestFlinkMergingMetrics.java      |  1 -
 .../spark/source/SparkFileWriterFactory.java       | 47 +++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 +-
 .../spark/source/SparkFileWriterFactory.java       | 47 +++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 +-
 .../spark/source/SparkFileWriterFactory.java       | 47 +++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 +-
 35 files changed, 422 insertions(+), 214 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java 
b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index 64aafbc850..818fa0b784 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -53,6 +53,35 @@ public abstract class BaseFileWriterFactory<T> implements 
FileWriterFactory<T>,
   private final Schema positionDeleteRowSchema;
   private final Map<String, String> writerProperties;
 
+  protected BaseFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      SortOrder equalityDeleteSortOrder,
+      Map<String, String> writerProperties) {
+    this.table = table;
+    this.dataFileFormat = dataFileFormat;
+    this.dataSchema = dataSchema;
+    this.dataSortOrder = dataSortOrder;
+    this.deleteFileFormat = deleteFileFormat;
+    this.equalityFieldIds = equalityFieldIds;
+    this.equalityDeleteRowSchema = equalityDeleteRowSchema;
+    this.equalityDeleteSortOrder = equalityDeleteSortOrder;
+    this.writerProperties = writerProperties;
+    this.positionDeleteRowSchema = null;
+  }
+
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, 
FileFormat, int[], Schema,
+   *     SortOrder, Map)} instead.
+   */
+  @Deprecated
   protected BaseFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -333,6 +362,11 @@ public abstract class BaseFileWriterFactory<T> implements 
FileWriterFactory<T>,
     return equalityDeleteRowSchema;
   }
 
+  /**
+   * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported.
+   */
+  @Deprecated
   protected Schema positionDeleteRowSchema() {
     return positionDeleteRowSchema;
   }
diff --git 
a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java 
b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
index 51b52c7dcb..6c32f0f545 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
@@ -62,6 +62,12 @@ public class GenericAppenderFactory implements 
FileAppenderFactory<Record> {
     this(schema, spec, null, null, null);
   }
 
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #GenericAppenderFactory(Schema, PartitionSpec, int[], Schema)} 
instead.
+   */
+  @Deprecated
   public GenericAppenderFactory(
       Schema schema,
       PartitionSpec spec,
@@ -71,6 +77,39 @@ public class GenericAppenderFactory implements 
FileAppenderFactory<Record> {
     this(null, schema, spec, null, equalityFieldIds, eqDeleteRowSchema, 
posDeleteRowSchema);
   }
 
+  /**
+   * Constructor for GenericAppenderFactory.
+   *
+   * @param schema the schema of the records to write
+   * @param spec the partition spec of the records
+   * @param equalityFieldIds the field ids for equality delete
+   * @param eqDeleteRowSchema the schema for equality delete rows
+   */
+  public GenericAppenderFactory(
+      Schema schema, PartitionSpec spec, int[] equalityFieldIds, Schema 
eqDeleteRowSchema) {
+    this(null, schema, spec, null, equalityFieldIds, eqDeleteRowSchema, null);
+  }
+
+  /**
+   * Constructor for GenericAppenderFactory.
+   *
+   * @param table iceberg table
+   * @param schema the schema of the records to write
+   * @param spec the partition spec of the records
+   * @param config the configuration for the writer
+   * @param equalityFieldIds the field ids for equality delete
+   * @param eqDeleteRowSchema the schema for equality delete rows
+   */
+  public GenericAppenderFactory(
+      Table table,
+      Schema schema,
+      PartitionSpec spec,
+      Map<String, String> config,
+      int[] equalityFieldIds,
+      Schema eqDeleteRowSchema) {
+    this(table, schema, spec, config, equalityFieldIds, eqDeleteRowSchema, 
null);
+  }
+
   /**
    * Constructor for GenericAppenderFactory.
    *
@@ -81,7 +120,11 @@ public class GenericAppenderFactory implements 
FileAppenderFactory<Record> {
    * @param equalityFieldIds the field ids for equality delete
    * @param eqDeleteRowSchema the schema for equality delete rows
    * @param posDeleteRowSchema the schema for position delete rows
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #GenericAppenderFactory(Table, Schema, PartitionSpec, Map, int[], 
Schema)} instead.
    */
+  @Deprecated
   public GenericAppenderFactory(
       Table table,
       Schema schema,
diff --git 
a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java 
b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
index 58ea9bafd6..a9c39bfd85 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
@@ -38,6 +38,34 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
 public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
 
+  GenericFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      SortOrder equalityDeleteSortOrder) {
+    super(
+        table,
+        dataFileFormat,
+        dataSchema,
+        dataSortOrder,
+        deleteFileFormat,
+        equalityFieldIds,
+        equalityDeleteRowSchema,
+        equalityDeleteSortOrder,
+        ImmutableMap.of());
+  }
+
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #GenericFileWriterFactory(Table, FileFormat, Schema, SortOrder, 
FileFormat, int[], Schema,
+   *     SortOrder)} instead.
+   */
+  @Deprecated
   GenericFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -199,14 +227,19 @@ public class GenericFileWriterFactory extends 
BaseFileWriterFactory<Record> {
       return this;
     }
 
-    /** Sets default writer properties. */
-    public Builder writerProperties(Map<String, String> newWriterProperties) {
-      this.writerProperties = newWriterProperties;
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
+    Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
+      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
       return this;
     }
 
-    Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
-      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
+    /** Sets default writer properties. */
+    public Builder writerProperties(Map<String, String> newWriterProperties) {
+      this.writerProperties = newWriterProperties;
       return this;
     }
 
diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java 
b/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
index 2ab908c8c7..fc8d420742 100644
--- 
a/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
+++ 
b/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
@@ -18,12 +18,15 @@
  */
 package org.apache.iceberg.data;
 
+import java.io.IOException;
 import java.util.List;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.TestFileWriterFactory;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
 
 public class TestGenericFileWriterFactory extends 
TestFileWriterFactory<Record> {
 
@@ -39,7 +42,6 @@ public class TestGenericFileWriterFactory extends 
TestFileWriterFactory<Record>
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -57,4 +59,9 @@ public class TestGenericFileWriterFactory extends 
TestFileWriterFactory<Record>
     records.forEach(set::add);
     return set;
   }
+
+  @Disabled("Position deletes with row data are no longer supported")
+  @Override
+  @TestTemplate
+  public void testPositionDeleteWriterWithRow() throws IOException {}
 }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java 
b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 5665fd0171..4722ae9895 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -169,34 +169,18 @@ public abstract class TestWriterMetrics<T> {
     // structField.longValue)
 
     Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
-
     assertThat(Conversions.<T>fromByteBuffer(Types.StringType.get(), 
lowerBounds.get(pathFieldId)))
         .isEqualTo(CharBuffer.wrap("File A"));
     assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
lowerBounds.get(posFieldId)))
         .isEqualTo(1L);
-
-    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
lowerBounds.get(1)))
-        .isEqualTo(3);
-    assertThat(lowerBounds).doesNotContainKey(2);
-    assertThat(lowerBounds).doesNotContainKey(3);
-    assertThat(lowerBounds).doesNotContainKey(4);
-    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
lowerBounds.get(5)))
-        .isEqualTo(3L);
+    checkRowStatistics(lowerBounds);
 
     Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
-
     assertThat(Conversions.<T>fromByteBuffer(Types.StringType.get(), 
upperBounds.get(pathFieldId)))
         .isEqualTo(CharBuffer.wrap("File A"));
     assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
upperBounds.get(posFieldId)))
         .isEqualTo(1L);
-
-    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
upperBounds.get(1)))
-        .isEqualTo(3);
-    assertThat(upperBounds).doesNotContainKey(2);
-    assertThat(upperBounds).doesNotContainKey(3);
-    assertThat(upperBounds).doesNotContainKey(4);
-    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
upperBounds.get(5)))
-        .isEqualTo(3L);
+    checkRowStatistics(upperBounds);
   }
 
   @TestTemplate
@@ -222,19 +206,8 @@ public abstract class TestWriterMetrics<T> {
     DeleteFile deleteFile = deleteWriter.toDeleteFile();
 
     // should have NO bounds for path and position as the file covers multiple 
data paths
-    Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
-    assertThat(lowerBounds).hasSize(2);
-    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
lowerBounds.get(1)))
-        .isEqualTo(3);
-    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
lowerBounds.get(5)))
-        .isEqualTo(3L);
-
-    Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
-    assertThat(upperBounds).hasSize(2);
-    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
upperBounds.get(1)))
-        .isEqualTo(3);
-    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
upperBounds.get(5)))
-        .isEqualTo(3L);
+    checkNotExistingRowStatistics(deleteFile.lowerBounds());
+    checkNotExistingRowStatistics(deleteFile.upperBounds());
   }
 
   @TestTemplate
@@ -378,4 +351,23 @@ public abstract class TestWriterMetrics<T> {
           .isEqualTo(1);
     }
   }
+
+  protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+    assertThat(bounds).hasSize(4);
+    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
bounds.get(1)))
+        .isEqualTo(3);
+    assertThat(bounds).doesNotContainKey(2);
+    assertThat(bounds).doesNotContainKey(3);
+    assertThat(bounds).doesNotContainKey(4);
+    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
bounds.get(5)))
+        .isEqualTo(3L);
+  }
+
+  protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer> 
bounds) {
+    assertThat(bounds).hasSize(2);
+    assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(), 
bounds.get(1)))
+        .isEqualTo(3);
+    assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(), 
bounds.get(5)))
+        .isEqualTo(3L);
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements 
FileAppenderFactory<RowData>, Seria
   private RowType eqDeleteFlinkSchema = null;
   private RowType posDeleteFlinkSchema = null;
 
+  public FlinkAppenderFactory(
+      Table table,
+      Schema schema,
+      RowType flinkSchema,
+      Map<String, String> props,
+      PartitionSpec spec,
+      int[] equalityFieldIds,
+      Schema eqDeleteRowSchema) {
+    this(table, schema, flinkSchema, props, spec, equalityFieldIds, 
eqDeleteRowSchema, null);
+  }
+
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec, 
int[], Schema)} instead.
+   */
+  @Deprecated
   public FlinkAppenderFactory(
       Table table,
       Schema schema,
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.data.FlinkAvroWriter;
 import org.apache.iceberg.flink.data.FlinkOrcWriter;
 import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> 
implements Serializable {
   private RowType dataFlinkType;
   private RowType equalityDeleteFlinkType;
-  private RowType positionDeleteFlinkType;
 
   private FlinkFileWriterFactory(
       Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       Schema equalityDeleteRowSchema,
       RowType equalityDeleteFlinkType,
       SortOrder equalityDeleteSortOrder,
-      Schema positionDeleteRowSchema,
-      RowType positionDeleteFlinkType,
       Map<String, String> writeProperties) {
 
     super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema,
         writeProperties);
 
     this.dataFlinkType = dataFlinkType;
     this.equalityDeleteFlinkType = equalityDeleteFlinkType;
-    this.positionDeleteFlinkType = positionDeleteFlinkType;
   }
 
   static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
   }
 
   @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    int rowFieldIndex = 
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
-    if (rowFieldIndex >= 0) {
-      // FlinkAvroWriter accepts just the Flink type of the row ignoring the 
path and pos
-      RowType positionDeleteRowFlinkType =
-          (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
-      builder.createWriterFunc(ignored -> new 
FlinkAvroWriter(positionDeleteRowFlinkType));
-    }
-  }
+  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
 
   @Override
   protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(), 
msgType));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> 
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     return equalityDeleteFlinkType;
   }
 
-  private RowType positionDeleteFlinkType() {
-    if (positionDeleteFlinkType == null) {
-      // wrap the optional row schema into the position delete schema that 
contains path and
-      // position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
-      this.positionDeleteFlinkType = 
FlinkSchemaUtil.convert(positionDeleteSchema);
-    }
-
-    return positionDeleteFlinkType;
-  }
-
   public static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     private Schema equalityDeleteRowSchema;
     private RowType equalityDeleteFlinkType;
     private SortOrder equalityDeleteSortOrder;
-    private Schema positionDeleteRowSchema;
-    private RowType positionDeleteFlinkType;
     private Map<String, String> writerProperties = ImmutableMap.of();
 
     public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       return this;
     }
 
-    public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
-      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
-      return this;
-    }
-
-    /**
-     * Sets a Flink type for position deletes.
-     *
-     * <p>If not set, the value is derived from the provided Iceberg schema.
-     */
-    public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType) 
{
-      this.positionDeleteFlinkType = newPositionDeleteFlinkType;
-      return this;
-    }
-
     /** Sets default writer properties. */
     public Builder writerProperties(Map<String, String> newWriterProperties) {
       this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
           equalityDeleteRowSchema,
           equalityDeleteFlinkType,
           equalityDeleteSortOrder,
-          positionDeleteRowSchema,
-          positionDeleteFlinkType,
           writerProperties);
     }
   }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import java.io.IOException;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.TestFileWriterFactory;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
 
 public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData> 
{
 
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
     }
     return set;
   }
+
+  @Disabled("Position deletes with row data are no longer supported")
+  @Override
+  @TestTemplate
+  public void testPositionDeleteWriterWithRow() throws IOException {}
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends 
TestPartitioningWriters<Row
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends 
TestPositionDeltaWriters<RowD
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends 
TestRollingFileWriters<RowData>
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
         .dataSchema(sourceTable.schema())
         .dataFileFormat(fileFormat)
         .deleteFileFormat(fileFormat)
-        .positionDeleteRowSchema(sourceTable.schema())
         .build();
   }
 
   @Override
   protected RowData toRow(Integer id, String data, boolean boolValue, Long 
longValue) {
     GenericRowData nested = GenericRowData.of(boolValue, longValue);
-    GenericRowData row = GenericRowData.of(id, StringData.fromString(data), 
nested);
-    return row;
+    return GenericRowData.of(id, StringData.fromString(data), nested);
   }
 
   @Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
     }
     return row;
   }
+
+  @Override
+  protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+    assertThat(bounds).hasSize(2);
+  }
+
+  @Override
+  protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer> 
bounds) {
+    assertThat(bounds).isNull();
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends 
TestMergingMetrics<RowData> {
                 ImmutableMap.of(),
                 PartitionSpec.unpartitioned(),
                 null,
-                null,
                 null)
             .newAppender(
                 Files.localOutput(File.createTempFile("junit", null, 
tempDir)), fileFormat);
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements 
FileAppenderFactory<RowData>, Seria
   private RowType eqDeleteFlinkSchema = null;
   private RowType posDeleteFlinkSchema = null;
 
+  public FlinkAppenderFactory(
+      Table table,
+      Schema schema,
+      RowType flinkSchema,
+      Map<String, String> props,
+      PartitionSpec spec,
+      int[] equalityFieldIds,
+      Schema eqDeleteRowSchema) {
+    this(table, schema, flinkSchema, props, spec, equalityFieldIds, 
eqDeleteRowSchema, null);
+  }
+
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec, 
int[], Schema)} instead.
+   */
+  @Deprecated
   public FlinkAppenderFactory(
       Table table,
       Schema schema,
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.data.FlinkAvroWriter;
 import org.apache.iceberg.flink.data.FlinkOrcWriter;
 import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> 
implements Serializable {
   private RowType dataFlinkType;
   private RowType equalityDeleteFlinkType;
-  private RowType positionDeleteFlinkType;
 
   private FlinkFileWriterFactory(
       Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       Schema equalityDeleteRowSchema,
       RowType equalityDeleteFlinkType,
       SortOrder equalityDeleteSortOrder,
-      Schema positionDeleteRowSchema,
-      RowType positionDeleteFlinkType,
       Map<String, String> writeProperties) {
 
     super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema,
         writeProperties);
 
     this.dataFlinkType = dataFlinkType;
     this.equalityDeleteFlinkType = equalityDeleteFlinkType;
-    this.positionDeleteFlinkType = positionDeleteFlinkType;
   }
 
   static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
   }
 
   @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    int rowFieldIndex = 
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
-    if (rowFieldIndex >= 0) {
-      // FlinkAvroWriter accepts just the Flink type of the row ignoring the 
path and pos
-      RowType positionDeleteRowFlinkType =
-          (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
-      builder.createWriterFunc(ignored -> new 
FlinkAvroWriter(positionDeleteRowFlinkType));
-    }
-  }
+  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
 
   @Override
   protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(), 
msgType));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> 
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     return equalityDeleteFlinkType;
   }
 
-  private RowType positionDeleteFlinkType() {
-    if (positionDeleteFlinkType == null) {
-      // wrap the optional row schema into the position delete schema that 
contains path and
-      // position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
-      this.positionDeleteFlinkType = 
FlinkSchemaUtil.convert(positionDeleteSchema);
-    }
-
-    return positionDeleteFlinkType;
-  }
-
   public static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     private Schema equalityDeleteRowSchema;
     private RowType equalityDeleteFlinkType;
     private SortOrder equalityDeleteSortOrder;
-    private Schema positionDeleteRowSchema;
-    private RowType positionDeleteFlinkType;
     private Map<String, String> writerProperties = ImmutableMap.of();
 
     public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       return this;
     }
 
-    public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
-      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
-      return this;
-    }
-
-    /**
-     * Sets a Flink type for position deletes.
-     *
-     * <p>If not set, the value is derived from the provided Iceberg schema.
-     */
-    public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType) 
{
-      this.positionDeleteFlinkType = newPositionDeleteFlinkType;
-      return this;
-    }
-
     /** Sets default writer properties. */
     public Builder writerProperties(Map<String, String> newWriterProperties) {
       this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
           equalityDeleteRowSchema,
           equalityDeleteFlinkType,
           equalityDeleteSortOrder,
-          positionDeleteRowSchema,
-          positionDeleteFlinkType,
           writerProperties);
     }
   }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import java.io.IOException;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.TestFileWriterFactory;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
 
 public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData> 
{
 
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
     }
     return set;
   }
+
+  @Disabled("Position deletes with row data are no longer supported")
+  @Override
+  @TestTemplate
+  public void testPositionDeleteWriterWithRow() throws IOException {}
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends 
TestPartitioningWriters<Row
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends 
TestPositionDeltaWriters<RowD
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends 
TestRollingFileWriters<RowData>
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
         .dataSchema(sourceTable.schema())
         .dataFileFormat(fileFormat)
         .deleteFileFormat(fileFormat)
-        .positionDeleteRowSchema(sourceTable.schema())
         .build();
   }
 
   @Override
   protected RowData toRow(Integer id, String data, boolean boolValue, Long 
longValue) {
     GenericRowData nested = GenericRowData.of(boolValue, longValue);
-    GenericRowData row = GenericRowData.of(id, StringData.fromString(data), 
nested);
-    return row;
+    return GenericRowData.of(id, StringData.fromString(data), nested);
   }
 
   @Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
     }
     return row;
   }
+
+  @Override
+  protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+    assertThat(bounds).hasSize(2);
+  }
+
+  @Override
+  protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer> 
bounds) {
+    assertThat(bounds).isNull();
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends 
TestMergingMetrics<RowData> {
                 ImmutableMap.of(),
                 PartitionSpec.unpartitioned(),
                 null,
-                null,
                 null)
             .newAppender(
                 Files.localOutput(File.createTempFile("junit", null, 
tempDir)), fileFormat);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements 
FileAppenderFactory<RowData>, Seria
   private RowType eqDeleteFlinkSchema = null;
   private RowType posDeleteFlinkSchema = null;
 
+  public FlinkAppenderFactory(
+      Table table,
+      Schema schema,
+      RowType flinkSchema,
+      Map<String, String> props,
+      PartitionSpec spec,
+      int[] equalityFieldIds,
+      Schema eqDeleteRowSchema) {
+    this(table, schema, flinkSchema, props, spec, equalityFieldIds, 
eqDeleteRowSchema, null);
+  }
+
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec, 
int[], Schema)} instead.
+   */
+  @Deprecated
   public FlinkAppenderFactory(
       Table table,
       Schema schema,
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.data.FlinkAvroWriter;
 import org.apache.iceberg.flink.data.FlinkOrcWriter;
 import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> 
implements Serializable {
   private RowType dataFlinkType;
   private RowType equalityDeleteFlinkType;
-  private RowType positionDeleteFlinkType;
 
   private FlinkFileWriterFactory(
       Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       Schema equalityDeleteRowSchema,
       RowType equalityDeleteFlinkType,
       SortOrder equalityDeleteSortOrder,
-      Schema positionDeleteRowSchema,
-      RowType positionDeleteFlinkType,
       Map<String, String> writeProperties) {
 
     super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        positionDeleteRowSchema,
         writeProperties);
 
     this.dataFlinkType = dataFlinkType;
     this.equalityDeleteFlinkType = equalityDeleteFlinkType;
-    this.positionDeleteFlinkType = positionDeleteFlinkType;
   }
 
   static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
   }
 
   @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
-    int rowFieldIndex = 
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
-    if (rowFieldIndex >= 0) {
-      // FlinkAvroWriter accepts just the Flink type of the row ignoring the 
path and pos
-      RowType positionDeleteRowFlinkType =
-          (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
-      builder.createWriterFunc(ignored -> new 
FlinkAvroWriter(positionDeleteRowFlinkType));
-    }
-  }
+  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
 
   @Override
   protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(), 
msgType));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
 
   @Override
   protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> 
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
     builder.transformPaths(path -> StringData.fromString(path.toString()));
   }
 
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     return equalityDeleteFlinkType;
   }
 
-  private RowType positionDeleteFlinkType() {
-    if (positionDeleteFlinkType == null) {
-      // wrap the optional row schema into the position delete schema that 
contains path and
-      // position
-      Schema positionDeleteSchema = 
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
-      this.positionDeleteFlinkType = 
FlinkSchemaUtil.convert(positionDeleteSchema);
-    }
-
-    return positionDeleteFlinkType;
-  }
-
   public static class Builder {
     private final Table table;
     private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     private Schema equalityDeleteRowSchema;
     private RowType equalityDeleteFlinkType;
     private SortOrder equalityDeleteSortOrder;
-    private Schema positionDeleteRowSchema;
-    private RowType positionDeleteFlinkType;
     private Map<String, String> writerProperties = ImmutableMap.of();
 
     public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
       return this;
     }
 
-    public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
-      this.positionDeleteRowSchema = newPositionDeleteRowSchema;
-      return this;
-    }
-
-    /**
-     * Sets a Flink type for position deletes.
-     *
-     * <p>If not set, the value is derived from the provided Iceberg schema.
-     */
-    public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType) 
{
-      this.positionDeleteFlinkType = newPositionDeleteFlinkType;
-      return this;
-    }
-
     /** Sets default writer properties. */
     public Builder writerProperties(Map<String, String> newWriterProperties) {
       this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
           equalityDeleteRowSchema,
           equalityDeleteFlinkType,
           equalityDeleteSortOrder,
-          positionDeleteRowSchema,
-          positionDeleteFlinkType,
           writerProperties);
     }
   }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import java.io.IOException;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.TestFileWriterFactory;
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
 
 public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData> 
{
 
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends 
TestFileWriterFactory<RowData> {
     }
     return set;
   }
+
+  @Disabled("Position deletes with row data are no longer supported")
+  @Override
+  @TestTemplate
+  public void testPositionDeleteWriterWithRow() throws IOException {}
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends 
TestPartitioningWriters<Row
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends 
TestPositionDeltaWriters<RowD
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends 
TestRollingFileWriters<RowData>
         .deleteFileFormat(format())
         .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
         .equalityDeleteRowSchema(equalityDeleteRowSchema)
-        .positionDeleteRowSchema(positionDeleteRowSchema)
         .build();
   }
 
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
         .dataSchema(sourceTable.schema())
         .dataFileFormat(fileFormat)
         .deleteFileFormat(fileFormat)
-        .positionDeleteRowSchema(sourceTable.schema())
         .build();
   }
 
   @Override
   protected RowData toRow(Integer id, String data, boolean boolValue, Long 
longValue) {
     GenericRowData nested = GenericRowData.of(boolValue, longValue);
-    GenericRowData row = GenericRowData.of(id, StringData.fromString(data), 
nested);
-    return row;
+    return GenericRowData.of(id, StringData.fromString(data), nested);
   }
 
   @Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends 
TestWriterMetrics<RowData> {
     }
     return row;
   }
+
+  @Override
+  protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+    assertThat(bounds).hasSize(2);
+  }
+
+  @Override
+  protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer> 
bounds) {
+    assertThat(bounds).isNull();
+  }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends 
TestMergingMetrics<RowData> {
                 ImmutableMap.of(),
                 PartitionSpec.unpartitioned(),
                 null,
-                null,
                 null)
             .newAppender(
                 Files.localOutput(File.createTempFile("junit", null, 
tempDir)), fileFormat);
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
   private StructType positionDeleteSparkType;
   private final Map<String, String> writeProperties;
 
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #SparkFileWriterFactory(Table, FileFormat, Schema, StructType, 
SortOrder, FileFormat,
+   *     int[], Schema, StructType, SortOrder, Map)} instead.
+   */
+  @Deprecated
   SparkFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
   }
 
+  SparkFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      StructType dataSparkType,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      StructType equalityDeleteSparkType,
+      SortOrder equalityDeleteSortOrder,
+      Map<String, String> writeProperties) {
+
+    super(
+        table,
+        dataFileFormat,
+        dataSchema,
+        dataSortOrder,
+        deleteFileFormat,
+        equalityFieldIds,
+        equalityDeleteRowSchema,
+        equalityDeleteSortOrder,
+        ImmutableMap.of());
+
+    this.dataSparkType = dataSparkType;
+    this.equalityDeleteSparkType = equalityDeleteSparkType;
+    this.positionDeleteSparkType = null;
+    this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+  }
+
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
       this.positionDeleteRowSchema = newPositionDeleteRowSchema;
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
       this.positionDeleteSparkType = newPositionDeleteSparkType;
       return this;
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 668d2ab05e..f1fd7b7ff9 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -590,7 +590,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       String file = id.getString(fileOrdinal);
       long position = id.getLong(positionOrdinal);
-      positionDelete.set(file, position, null);
+      positionDelete.set(file, position);
       delegate.write(positionDelete, spec, partitionProjection);
     }
 
@@ -822,6 +822,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       return targetDataFileSize;
     }
 
+    /* @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
     StructType deleteSparkType() {
       return deleteSparkType;
     }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
   private StructType positionDeleteSparkType;
   private final Map<String, String> writeProperties;
 
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #SparkFileWriterFactory(Table, FileFormat, Schema, StructType, 
SortOrder, FileFormat,
+   *     int[], Schema, StructType, SortOrder, Map)} instead.
+   */
+  @Deprecated
   SparkFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
   }
 
+  SparkFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      StructType dataSparkType,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      StructType equalityDeleteSparkType,
+      SortOrder equalityDeleteSortOrder,
+      Map<String, String> writeProperties) {
+
+    super(
+        table,
+        dataFileFormat,
+        dataSchema,
+        dataSortOrder,
+        deleteFileFormat,
+        equalityFieldIds,
+        equalityDeleteRowSchema,
+        equalityDeleteSortOrder,
+        ImmutableMap.of());
+
+    this.dataSparkType = dataSparkType;
+    this.equalityDeleteSparkType = equalityDeleteSparkType;
+    this.positionDeleteSparkType = null;
+    this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+  }
+
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
       this.positionDeleteRowSchema = newPositionDeleteRowSchema;
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
       this.positionDeleteSparkType = newPositionDeleteSparkType;
       return this;
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 39bfe38848..ddad1a749a 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -594,7 +594,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       String file = id.getString(fileOrdinal);
       long position = id.getLong(positionOrdinal);
-      positionDelete.set(file, position, null);
+      positionDelete.set(file, position);
       delegate.write(positionDelete, spec, partitionProjection);
     }
 
@@ -826,6 +826,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       return targetDataFileSize;
     }
 
+    /* @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
     StructType deleteSparkType() {
       return deleteSparkType;
     }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
   private StructType positionDeleteSparkType;
   private final Map<String, String> writeProperties;
 
+  /**
+   * @deprecated This constructor is deprecated as of version 1.11.0 and will 
be removed in 1.12.0.
+   *     Position deletes that include row data are no longer supported. Use 
{@link
+   *     #SparkFileWriterFactory(Table, FileFormat, Schema, StructType, 
SortOrder, FileFormat,
+   *     int[], Schema, StructType, SortOrder, Map)} instead.
+   */
+  @Deprecated
   SparkFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
     this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
   }
 
+  SparkFileWriterFactory(
+      Table table,
+      FileFormat dataFileFormat,
+      Schema dataSchema,
+      StructType dataSparkType,
+      SortOrder dataSortOrder,
+      FileFormat deleteFileFormat,
+      int[] equalityFieldIds,
+      Schema equalityDeleteRowSchema,
+      StructType equalityDeleteSparkType,
+      SortOrder equalityDeleteSortOrder,
+      Map<String, String> writeProperties) {
+
+    super(
+        table,
+        dataFileFormat,
+        dataSchema,
+        dataSortOrder,
+        deleteFileFormat,
+        equalityFieldIds,
+        equalityDeleteRowSchema,
+        equalityDeleteSortOrder,
+        ImmutableMap.of());
+
+    this.dataSparkType = dataSparkType;
+    this.equalityDeleteSparkType = equalityDeleteSparkType;
+    this.positionDeleteSparkType = null;
+    this.writeProperties = writeProperties != null ? writeProperties : 
ImmutableMap.of();
+  }
+
   static Builder builderFor(Table table) {
     return new Builder(table);
   }
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends 
BaseFileWriterFactory<InternalRow> {
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
       this.positionDeleteRowSchema = newPositionDeleteRowSchema;
       return this;
     }
 
+    /**
+     * @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
+    @Deprecated
     Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
       this.positionDeleteSparkType = newPositionDeleteSparkType;
       return this;
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 88ca202cc9..d072397dc6 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -608,7 +608,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       String file = id.getString(fileOrdinal);
       long position = id.getLong(positionOrdinal);
-      positionDelete.set(file, position, null);
+      positionDelete.set(file, position);
       delegate.write(positionDelete, spec, partitionProjection);
     }
 
@@ -883,6 +883,9 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       return targetDataFileSize;
     }
 
+    /* @deprecated This method is deprecated as of version 1.11.0 and will be 
removed in 1.12.0.
+     *     Position deletes that include row data are no longer supported.
+     */
     StructType deleteSparkType() {
       return deleteSparkType;
     }

Reply via email to