This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e7999a194d Core, Data, Spark 3.5: Support file and partition delete 
granularity (#9384)
e7999a194d is described below

commit e7999a194dc0f32fba8fb515c9108764ed6ba6b5
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Jan 2 22:26:59 2024 +0100

    Core, Data, Spark 3.5: Support file and partition delete granularity (#9384)
---
 .../org/apache/iceberg/util/CharSequenceUtil.java  |  45 +++++
 .../java/org/apache/iceberg/TableProperties.java   |   4 +
 .../apache/iceberg/deletes/DeleteGranularity.java  |  71 ++++++++
 .../deletes/FileScopedPositionDeleteWriter.java    | 113 +++++++++++++
 .../deletes/SortingPositionOnlyDeleteWriter.java   |  67 ++++++--
 .../iceberg/io/ClusteredPositionDeleteWriter.java  |  25 +++
 .../iceberg/io/FanoutPositionOnlyDeleteWriter.java |  21 ++-
 .../apache/iceberg/io/TestPartitioningWriters.java | 188 +++++++++++++++++++--
 .../spark/extensions/TestMergeOnReadDelete.java    |  45 +++++
 .../spark/extensions/TestMergeOnReadMerge.java     |  57 +++++++
 .../spark/extensions/TestMergeOnReadUpdate.java    |  55 ++++++
 .../iceberg/spark/source/WritersBenchmark.java     |  52 +++++-
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  12 ++
 .../apache/iceberg/spark/SparkWriteOptions.java    |   3 +
 .../spark/source/SparkPositionDeletesRewrite.java  |  15 +-
 .../spark/source/SparkPositionDeltaWrite.java      |  14 +-
 .../apache/iceberg/spark/TestSparkWriteConf.java   |  57 +++++++
 .../TestRewritePositionDeleteFilesAction.java      |  37 ++++
 18 files changed, 846 insertions(+), 35 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java 
b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
new file mode 100644
index 0000000000..b28c90df17
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/CharSequenceUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+public class CharSequenceUtil {
+
+  private CharSequenceUtil() {}
+
+  public static boolean unequalPaths(CharSequence s1, CharSequence s2) {
+    if (s1 == s2) {
+      return false;
+    }
+
+    int s1Length = s1.length();
+    int s2Length = s2.length();
+
+    if (s1Length != s2Length) {
+      return true;
+    }
+
+    for (int index = s1Length - 1; index >= 0; index--) {
+      if (s1.charAt(index) != s2.charAt(index)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 84188c0f78..2267ba03fd 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg;
 
 import java.util.Set;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 
 public class TableProperties {
@@ -334,6 +335,9 @@ public class TableProperties {
   public static final String MAX_REF_AGE_MS = "history.expire.max-ref-age-ms";
   public static final long MAX_REF_AGE_MS_DEFAULT = Long.MAX_VALUE;
 
+  public static final String DELETE_GRANULARITY = "write.delete.granularity";
+  public static final String DELETE_GRANULARITY_DEFAULT = 
DeleteGranularity.PARTITION.toString();
+
   public static final String DELETE_ISOLATION_LEVEL = 
"write.delete.isolation-level";
   public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";
 
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java 
b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
new file mode 100644
index 0000000000..c225192fa1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/deletes/DeleteGranularity.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * An enum that represents the granularity of deletes.
+ *
+ * <p>Under partition granularity, delete writers are directed to group 
deletes for different data
+ * files into one delete file. This strategy tends to reduce the total number 
of delete files in the
+ * table. However, a scan for a single data file will require reading delete 
information for
+ * multiple data files even if those other files are not required for the 
scan. All irrelevant
+ * deletes will be discarded by readers but reading this extra information 
will cause overhead. The
+ * overhead can potentially be mitigated via delete file caching.
+ *
+ * <p>Under file granularity, delete writers always organize deletes by their 
target data file,
+ * creating separate delete files for each referenced data file. This strategy 
ensures the job
+ * planning does not assign irrelevant deletes to data files and readers only 
load necessary delete
+ * information. However, it also increases the total number of delete files in 
the table and may
+ * require a more aggressive approach for delete file compaction.
+ *
+ * <p>Currently, this configuration is only applicable to position deletes.
+ *
+ * <p>Each granularity has its own benefits and drawbacks and should be picked 
based on a use case.
+ * Regular delete compaction is still required regardless of which granularity 
is chosen. It is also
+ * possible to use one granularity for ingestion and another one for table 
maintenance.
+ */
+public enum DeleteGranularity {
+  FILE,
+  PARTITION;
+
+  @Override
+  public String toString() {
+    switch (this) {
+      case FILE:
+        return "file";
+      case PARTITION:
+        return "partition";
+      default:
+        throw new IllegalArgumentException("Unknown delete granularity: " + 
this);
+    }
+  }
+
+  public static DeleteGranularity fromString(String valueAsString) {
+    Preconditions.checkArgument(valueAsString != null, "Value is null");
+    if (FILE.toString().equalsIgnoreCase(valueAsString)) {
+      return FILE;
+    } else if (PARTITION.toString().equalsIgnoreCase(valueAsString)) {
+      return PARTITION;
+    } else {
+      throw new IllegalArgumentException("Unknown delete granularity: " + 
valueAsString);
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
 
b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
new file mode 100644
index 0000000000..d85a5645fb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/FileScopedPositionDeleteWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.CharSequenceUtil;
+
+/**
+ * A position delete writer that produces a separate delete file for each 
referenced data file.
+ *
+ * <p>This writer does not keep track of seen deletes and assumes all incoming 
records are ordered
+ * by file and position as required by the spec. If there is no external 
process to order the
+ * records, consider using {@link SortingPositionOnlyDeleteWriter} instead.
+ */
+public class FileScopedPositionDeleteWriter<T>
+    implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
+
+  private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> 
writers;
+  private final List<DeleteFile> deleteFiles;
+  private final CharSequenceSet referencedDataFiles;
+
+  private FileWriter<PositionDelete<T>, DeleteWriteResult> currentWriter = 
null;
+  private CharSequence currentPath = null;
+  private boolean closed = false;
+
+  public FileScopedPositionDeleteWriter(
+      Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers) {
+    this.writers = writers;
+    this.deleteFiles = Lists.newArrayList();
+    this.referencedDataFiles = CharSequenceSet.empty();
+  }
+
+  @Override
+  public void write(PositionDelete<T> positionDelete) {
+    writer(positionDelete.path()).write(positionDelete);
+  }
+
+  private FileWriter<PositionDelete<T>, DeleteWriteResult> writer(CharSequence 
path) {
+    if (currentWriter == null) {
+      openCurrentWriter(path);
+    } else if (CharSequenceUtil.unequalPaths(currentPath, path)) {
+      closeCurrentWriter();
+      openCurrentWriter(path);
+    }
+
+    return currentWriter;
+  }
+
+  @Override
+  public long length() {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
implement length");
+  }
+
+  @Override
+  public DeleteWriteResult result() {
+    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!closed) {
+      closeCurrentWriter();
+      this.closed = true;
+    }
+  }
+
+  private void openCurrentWriter(CharSequence path) {
+    Preconditions.checkState(!closed, "Writer has already been closed");
+    this.currentWriter = writers.get();
+    this.currentPath = path;
+  }
+
+  private void closeCurrentWriter() {
+    if (currentWriter != null) {
+      try {
+        currentWriter.close();
+        DeleteWriteResult result = currentWriter.result();
+        deleteFiles.addAll(result.deleteFiles());
+        referencedDataFiles.addAll(result.referencedDataFiles());
+        this.currentWriter = null;
+        this.currentPath = null;
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close current writer", e);
+      }
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
index 8819640e9f..3fc6c5eec9 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -19,12 +19,17 @@
 package org.apache.iceberg.deletes;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.io.DeleteWriteResult;
 import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.CharSequenceSet;
 import org.roaringbitmap.longlong.PeekableLongIterator;
 import org.roaringbitmap.longlong.Roaring64Bitmap;
 
@@ -41,12 +46,20 @@ import org.roaringbitmap.longlong.Roaring64Bitmap;
 public class SortingPositionOnlyDeleteWriter<T>
     implements FileWriter<PositionDelete<T>, DeleteWriteResult> {
 
-  private final FileWriter<PositionDelete<T>, DeleteWriteResult> writer;
+  private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> 
writers;
+  private final DeleteGranularity granularity;
   private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
   private DeleteWriteResult result = null;
 
   public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, 
DeleteWriteResult> writer) {
-    this.writer = writer;
+    this(() -> writer, DeleteGranularity.PARTITION);
+  }
+
+  public SortingPositionOnlyDeleteWriter(
+      Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+      DeleteGranularity granularity) {
+    this.writers = writers;
+    this.granularity = granularity;
     this.positionsByPath = CharSequenceMap.create();
   }
 
@@ -60,7 +73,7 @@ public class SortingPositionOnlyDeleteWriter<T>
 
   @Override
   public long length() {
-    return writer.length();
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
implement length");
   }
 
   @Override
@@ -71,14 +84,44 @@ public class SortingPositionOnlyDeleteWriter<T>
   @Override
   public void close() throws IOException {
     if (result == null) {
-      this.result = writeDeletes();
+      switch (granularity) {
+        case FILE:
+          this.result = writeFileDeletes();
+          return;
+        case PARTITION:
+          this.result = writePartitionDeletes();
+          return;
+        default:
+          throw new UnsupportedOperationException("Unsupported delete 
granularity: " + granularity);
+      }
+    }
+  }
+
+  // write deletes for all data files together
+  private DeleteWriteResult writePartitionDeletes() throws IOException {
+    return writeDeletes(positionsByPath.keySet());
+  }
+
+  // write deletes for different data files into distinct delete files
+  private DeleteWriteResult writeFileDeletes() throws IOException {
+    List<DeleteFile> deleteFiles = Lists.newArrayList();
+    CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+
+    for (CharSequence path : positionsByPath.keySet()) {
+      DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
+      deleteFiles.addAll(writeResult.deleteFiles());
+      referencedDataFiles.addAll(writeResult.referencedDataFiles());
     }
+
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
   }
 
-  private DeleteWriteResult writeDeletes() throws IOException {
+  private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) 
throws IOException {
+    FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();
+
     try {
       PositionDelete<T> positionDelete = PositionDelete.create();
-      for (CharSequence path : sortedPaths()) {
+      for (CharSequence path : sort(paths)) {
         // the iterator provides values in ascending sorted order
         PeekableLongIterator positions = 
positionsByPath.get(path).getLongIterator();
         while (positions.hasNext()) {
@@ -93,9 +136,13 @@ public class SortingPositionOnlyDeleteWriter<T>
     return writer.result();
   }
 
-  private List<CharSequence> sortedPaths() {
-    List<CharSequence> paths = Lists.newArrayList(positionsByPath.keySet());
-    paths.sort(Comparators.charSequences());
-    return paths;
+  private Collection<CharSequence> sort(Collection<CharSequence> paths) {
+    if (paths.size() <= 1) {
+      return paths;
+    } else {
+      List<CharSequence> sortedPaths = Lists.newArrayList(paths);
+      sortedPaths.sort(Comparators.charSequences());
+      return sortedPaths;
+    }
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
index c0c26c2b90..c9d911894c 100644
--- 
a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
+++ 
b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -22,6 +22,8 @@ import java.util.List;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.deletes.FileScopedPositionDeleteWriter;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.CharSequenceSet;
@@ -38,6 +40,7 @@ public class ClusteredPositionDeleteWriter<T>
   private final OutputFileFactory fileFactory;
   private final FileIO io;
   private final long targetFileSizeInBytes;
+  private final DeleteGranularity granularity;
   private final List<DeleteFile> deleteFiles;
   private final CharSequenceSet referencedDataFiles;
 
@@ -46,10 +49,20 @@ public class ClusteredPositionDeleteWriter<T>
       OutputFileFactory fileFactory,
       FileIO io,
       long targetFileSizeInBytes) {
+    this(writerFactory, fileFactory, io, targetFileSizeInBytes, 
DeleteGranularity.PARTITION);
+  }
+
+  public ClusteredPositionDeleteWriter(
+      FileWriterFactory<T> writerFactory,
+      OutputFileFactory fileFactory,
+      FileIO io,
+      long targetFileSizeInBytes,
+      DeleteGranularity granularity) {
     this.writerFactory = writerFactory;
     this.fileFactory = fileFactory;
     this.io = io;
     this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.granularity = granularity;
     this.deleteFiles = Lists.newArrayList();
     this.referencedDataFiles = CharSequenceSet.empty();
   }
@@ -57,6 +70,18 @@ public class ClusteredPositionDeleteWriter<T>
   @Override
   protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
       PartitionSpec spec, StructLike partition) {
+    switch (granularity) {
+      case FILE:
+        return new FileScopedPositionDeleteWriter<>(() -> 
newRollingWriter(spec, partition));
+      case PARTITION:
+        return newRollingWriter(spec, partition);
+      default:
+        throw new UnsupportedOperationException("Unsupported delete 
granularity: " + granularity);
+    }
+  }
+
+  private RollingPositionDeleteWriter<T> newRollingWriter(
+      PartitionSpec spec, StructLike partition) {
     return new RollingPositionDeleteWriter<>(
         writerFactory, fileFactory, io, targetFileSizeInBytes, spec, 
partition);
   }
diff --git 
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
index 15c1cc6bb4..c6a55064b7 100644
--- 
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
+++ 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -41,6 +42,7 @@ public class FanoutPositionOnlyDeleteWriter<T>
   private final OutputFileFactory fileFactory;
   private final FileIO io;
   private final long targetFileSizeInBytes;
+  private final DeleteGranularity granularity;
   private final List<DeleteFile> deleteFiles;
   private final CharSequenceSet referencedDataFiles;
 
@@ -49,10 +51,20 @@ public class FanoutPositionOnlyDeleteWriter<T>
       OutputFileFactory fileFactory,
       FileIO io,
       long targetFileSizeInBytes) {
+    this(writerFactory, fileFactory, io, targetFileSizeInBytes, 
DeleteGranularity.PARTITION);
+  }
+
+  public FanoutPositionOnlyDeleteWriter(
+      FileWriterFactory<T> writerFactory,
+      OutputFileFactory fileFactory,
+      FileIO io,
+      long targetFileSizeInBytes,
+      DeleteGranularity granularity) {
     this.writerFactory = writerFactory;
     this.fileFactory = fileFactory;
     this.io = io;
     this.targetFileSizeInBytes = targetFileSizeInBytes;
+    this.granularity = granularity;
     this.deleteFiles = Lists.newArrayList();
     this.referencedDataFiles = CharSequenceSet.empty();
   }
@@ -60,10 +72,11 @@ public class FanoutPositionOnlyDeleteWriter<T>
   @Override
   protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(
       PartitionSpec spec, StructLike partition) {
-    FileWriter<PositionDelete<T>, DeleteWriteResult> delegate =
-        new RollingPositionDeleteWriter<>(
-            writerFactory, fileFactory, io, targetFileSizeInBytes, spec, 
partition);
-    return new SortingPositionOnlyDeleteWriter<>(delegate);
+    return new SortingPositionOnlyDeleteWriter<>(
+        () ->
+            new RollingPositionDeleteWriter<>(
+                writerFactory, fileFactory, io, targetFileSizeInBytes, spec, 
partition),
+        granularity);
   }
 
   @Override
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java 
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 32a8c8d2cc..d767743262 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.io;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
@@ -26,6 +28,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -278,11 +281,21 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
   }
 
   @Test
-  public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
+  public void testClusteredPositionDeleteWriterNoRecordsPartitionGranularity() 
throws IOException {
+    checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterNoRecordsFileGranularity() 
throws IOException {
+    checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity.FILE);
+  }
+
+  private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity 
deleteGranularity)
+      throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
     ClusteredPositionDeleteWriter<T> writer =
         new ClusteredPositionDeleteWriter<>(
-            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
 
     writer.close();
     Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -296,7 +309,18 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
   }
 
   @Test
-  public void testClusteredPositionDeleteWriterMultipleSpecs() throws 
IOException {
+  public void 
testClusteredPositionDeleteWriterMultipleSpecsPartitionGranularity()
+      throws IOException {
+    
checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterMultipleSpecsFileGranularity() 
throws IOException {
+    checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity.FILE);
+  }
+
+  private void 
checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity 
deleteGranularity)
+      throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
     // add an unpartitioned data file
@@ -330,7 +354,7 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
 
     ClusteredPositionDeleteWriter<T> writer =
         new ClusteredPositionDeleteWriter<>(
-            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
 
     PartitionSpec unpartitionedSpec = table.specs().get(0);
     PartitionSpec bucketSpec = table.specs().get(1);
@@ -364,7 +388,19 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
   }
 
   @Test
-  public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() 
throws IOException {
+  public void 
testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsPartitionGranularity()
+      throws IOException {
+    
checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void 
testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsFileGranularity()
+      throws IOException {
+    
checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(DeleteGranularity.FILE);
+  }
+
+  private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(
+      DeleteGranularity deleteGranularity) throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
     table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
@@ -377,7 +413,7 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
 
     ClusteredPositionDeleteWriter<T> writer =
         new ClusteredPositionDeleteWriter<>(
-            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
 
     PartitionSpec unpartitionedSpec = table.specs().get(0);
     PartitionSpec bucketSpec = table.specs().get(1);
@@ -419,6 +455,61 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
     writer.close();
   }
 
+  @Test
+  public void testClusteredPositionDeleteWriterPartitionGranularity() throws 
IOException {
+    checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testClusteredPositionDeleteWriterFileGranularity() throws 
IOException {
+    checkClusteredPositionDeleteWriterGranularity(DeleteGranularity.FILE);
+  }
+
+  private void checkClusteredPositionDeleteWriterGranularity(DeleteGranularity 
deleteGranularity)
+      throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add the first data file
+    List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), 
toRow(11, "aaa"));
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile1).commit();
+
+    // add the second data file
+    List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), 
toRow(12, "aaa"));
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile2).commit();
+
+    // init the delete writer
+    ClusteredPositionDeleteWriter<T> writer =
+        new ClusteredPositionDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
+
+    PartitionSpec spec = table.spec();
+
+    // write deletes for both data files
+    writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+    writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+    writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+    writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+    writer.close();
+
+    // verify the writer result
+    DeleteWriteResult result = writer.result();
+    int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 
2 : 1;
+    assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+    assertThat(result.referencedDataFiles()).hasSize(2);
+    assertThat(result.referencesDataFiles()).isTrue();
+
+    // commit the deletes
+    RowDelta rowDelta = table.newRowDelta();
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    // verify correctness
+    List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, 
"aaa"));
+    assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+  }
+
   @Test
   public void testFanoutDataWriterNoRecords() throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
@@ -464,11 +555,21 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
   }
 
   @Test
-  public void testFanoutPositionOnlyDeleteWriterNoRecords() throws IOException 
{
+  public void 
testFanoutPositionOnlyDeleteWriterNoRecordsPartitionGranularity() throws 
IOException {
+    checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testFanoutPositionOnlyDeleteWriterNoRecordsFileGranularity() 
throws IOException {
+    checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity.FILE);
+  }
+
+  private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity 
deleteGranularity)
+      throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
     FanoutPositionOnlyDeleteWriter<T> writer =
         new FanoutPositionOnlyDeleteWriter<>(
-            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
 
     writer.close();
     Assert.assertEquals(0, writer.result().deleteFiles().size());
@@ -482,7 +583,19 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
   }
 
   @Test
-  public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecords() throws 
IOException {
+  public void 
testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsPartitionGranularity()
+      throws IOException {
+    
checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void 
testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsFileGranularity()
+      throws IOException {
+    
checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(DeleteGranularity.FILE);
+  }
+
+  private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(
+      DeleteGranularity deleteGranularity) throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
     // add an unpartitioned data file
@@ -516,7 +629,7 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
 
     FanoutPositionOnlyDeleteWriter<T> writer =
         new FanoutPositionOnlyDeleteWriter<>(
-            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
 
     PartitionSpec unpartitionedSpec = table.specs().get(0);
     PartitionSpec bucketSpec = table.specs().get(1);
@@ -557,4 +670,59 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
     List<T> expectedRows = ImmutableList.of(toRow(12, "bbb"));
     Assert.assertEquals("Records should match", toSet(expectedRows), 
actualRowSet("*"));
   }
+
+  @Test
+  public void testFanoutPositionOnlyDeleteWriterPartitionGranularity() throws 
IOException {
+    
checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testFanoutPositionOnlyDeleteWriterFileGranularity() throws 
IOException {
+    checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity.FILE);
+  }
+
+  private void 
checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity 
deleteGranularity)
+      throws IOException {
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add the first data file
+    List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), 
toRow(11, "aaa"));
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile1).commit();
+
+    // add the second data file
+    List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), 
toRow(12, "aaa"));
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile2).commit();
+
+    // init the delete writer
+    FanoutPositionOnlyDeleteWriter<T> writer =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
deleteGranularity);
+
+    PartitionSpec spec = table.spec();
+
+    // write deletes for both data files (the order of records is mixed)
+    writer.write(positionDelete(dataFile1.path(), 1L, null), spec, null);
+    writer.write(positionDelete(dataFile2.path(), 0L, null), spec, null);
+    writer.write(positionDelete(dataFile1.path(), 0L, null), spec, null);
+    writer.write(positionDelete(dataFile2.path(), 1L, null), spec, null);
+    writer.close();
+
+    // verify the writer result
+    DeleteWriteResult result = writer.result();
+    int expectedNumDeleteFiles = deleteGranularity == DeleteGranularity.FILE ? 
2 : 1;
+    assertThat(result.deleteFiles()).hasSize(expectedNumDeleteFiles);
+    assertThat(result.referencedDataFiles()).hasSize(2);
+    assertThat(result.referencesDataFiles()).isTrue();
+
+    // commit the deletes
+    RowDelta rowDelta = table.newRowDelta();
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    // verify correctness
+    List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, 
"aaa"));
+    assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index 9c0e8235f8..01f24c4dfe 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -28,14 +29,18 @@ import java.util.Map;
 import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.spark.source.TestSparkCatalog;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.assertj.core.api.Assertions;
 import org.junit.Assert;
@@ -80,6 +85,46 @@ public class TestMergeOnReadDelete extends TestDelete {
     TestSparkCatalog.clearTables();
   }
 
+  @Test
+  public void testDeleteFileGranularity() throws NoSuchTableException {
+    checkDeleteFileGranularity(DeleteGranularity.FILE);
+  }
+
+  @Test
+  public void testDeletePartitionGranularity() throws NoSuchTableException {
+    checkDeleteFileGranularity(DeleteGranularity.PARTITION);
+  }
+
+  private void checkDeleteFileGranularity(DeleteGranularity deleteGranularity)
+      throws NoSuchTableException {
+    createAndInitPartitionedTable();
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+        tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+    append(tableName, new Employee(1, "hr"), new Employee(2, "hr"));
+    append(tableName, new Employee(3, "hr"), new Employee(4, "hr"));
+    append(tableName, new Employee(1, "hardware"), new Employee(2, 
"hardware"));
+    append(tableName, new Employee(3, "hardware"), new Employee(4, 
"hardware"));
+
+    createBranchIfNeeded();
+
+    sql("DELETE FROM %s WHERE id = 1 OR id = 3", commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(table.snapshots()).hasSize(5);
+
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    String expectedDeleteFilesCount = deleteGranularity == 
DeleteGranularity.FILE ? "4" : "2";
+    validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, null);
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(2, "hardware"), row(2, "hr"), row(4, "hardware"), 
row(4, "hr")),
+        sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+  }
+
   @Test
   public void testCommitUnknownException() {
     createAndInitTable("id INT, dep STRING, category STRING");
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index e743b32b45..f9c13d828c 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -18,11 +18,20 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.Map;
 import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Encoders;
+import org.junit.Test;
 
 public class TestMergeOnReadMerge extends TestMerge {
 
@@ -56,4 +65,52 @@ public class TestMergeOnReadMerge extends TestMerge {
         TableProperties.MERGE_MODE,
         RowLevelOperationMode.MERGE_ON_READ.modeName());
   }
+
+  @Test
+  public void testMergeDeleteFileGranularity() {
+    checkMergeDeleteGranularity(DeleteGranularity.FILE);
+  }
+
+  @Test
+  public void testMergeDeletePartitionGranularity() {
+    checkMergeDeleteGranularity(DeleteGranularity.PARTITION);
+  }
+
+  private void checkMergeDeleteGranularity(DeleteGranularity 
deleteGranularity) {
+    createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* 
empty */);
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+        tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, 
\"dep\": \"hr\" }");
+    append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, 
\"dep\": \"hr\" }");
+    append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, 
\"dep\": \"it\" }");
+    append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, 
\"dep\": \"it\" }");
+
+    createBranchIfNeeded();
+
+    createOrReplaceView("source", ImmutableList.of(1, 3, 5), Encoders.INT());
+
+    sql(
+        "MERGE INTO %s AS t USING source AS s "
+            + "ON t.id == s.value "
+            + "WHEN MATCHED THEN "
+            + " DELETE "
+            + "WHEN NOT MATCHED THEN "
+            + " INSERT (id, dep) VALUES (-1, 'other')",
+        commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(table.snapshots()).hasSize(5);
+
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    String expectedDeleteFilesCount = deleteGranularity == 
DeleteGranularity.FILE ? "4" : "2";
+    validateMergeOnRead(currentSnapshot, "3", expectedDeleteFilesCount, "1");
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(-1, "other"), row(2, "hr"), row(2, "it"), row(4, 
"hr"), row(4, "it")),
+        sql("SELECT * FROM %s ORDER BY id ASC, dep ASC", selectTarget()));
+  }
 }
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index 0207d4ce4d..45ef343b2d 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -18,11 +18,19 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import java.util.Map;
 import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.junit.Test;
 
 public class TestMergeOnReadUpdate extends TestUpdate {
 
@@ -56,4 +64,51 @@ public class TestMergeOnReadUpdate extends TestUpdate {
         TableProperties.UPDATE_MODE,
         RowLevelOperationMode.MERGE_ON_READ.modeName());
   }
+
+  @Test
+  public void testUpdateFileGranularity() {
+    checkUpdateFileGranularity(DeleteGranularity.FILE);
+  }
+
+  @Test
+  public void testUpdatePartitionGranularity() {
+    checkUpdateFileGranularity(DeleteGranularity.PARTITION);
+  }
+
+  private void checkUpdateFileGranularity(DeleteGranularity deleteGranularity) 
{
+    createAndInitTable("id INT, dep STRING", "PARTITIONED BY (dep)", null /* 
empty */);
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s')",
+        tableName, TableProperties.DELETE_GRANULARITY, deleteGranularity);
+
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }\n" + "{ \"id\": 2, 
\"dep\": \"hr\" }");
+    append(tableName, "{ \"id\": 3, \"dep\": \"hr\" }\n" + "{ \"id\": 4, 
\"dep\": \"hr\" }");
+    append(tableName, "{ \"id\": 1, \"dep\": \"it\" }\n" + "{ \"id\": 2, 
\"dep\": \"it\" }");
+    append(tableName, "{ \"id\": 3, \"dep\": \"it\" }\n" + "{ \"id\": 4, 
\"dep\": \"it\" }");
+
+    createBranchIfNeeded();
+
+    sql("UPDATE %s SET id = id - 1 WHERE id = 1 OR id = 3", commitTarget());
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    assertThat(table.snapshots()).hasSize(5);
+
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+    String expectedDeleteFilesCount = deleteGranularity == 
DeleteGranularity.FILE ? "4" : "2";
+    validateMergeOnRead(currentSnapshot, "2", expectedDeleteFilesCount, "2");
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(0, "hr"),
+            row(2, "hr"),
+            row(2, "hr"),
+            row(4, "hr"),
+            row(0, "it"),
+            row(2, "it"),
+            row(2, "it"),
+            row(4, "it")),
+        sql("SELECT * FROM %s ORDER BY dep ASC, id ASC", selectTarget()));
+  }
 }
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 094fd58443..71813c5a63 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopTables;
@@ -364,8 +365,20 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
   @Benchmark
   @Threads(1)
-  public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole 
blackhole)
+  public void 
writeUnpartitionedClusteredPositionDeleteWriterPartitionGranularity(
+      Blackhole blackhole) throws IOException {
+    writeUnpartitionedClusteredPositionDeleteWriter(blackhole, 
DeleteGranularity.PARTITION);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void 
writeUnpartitionedClusteredPositionDeleteWriterFileGranularity(Blackhole 
blackhole)
       throws IOException {
+    writeUnpartitionedClusteredPositionDeleteWriter(blackhole, 
DeleteGranularity.FILE);
+  }
+
+  private void writeUnpartitionedClusteredPositionDeleteWriter(
+      Blackhole blackhole, DeleteGranularity deleteGranularity) throws 
IOException {
     FileIO io = table().io();
 
     OutputFileFactory fileFactory = newFileFactory();
@@ -374,7 +387,7 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     ClusteredPositionDeleteWriter<InternalRow> writer =
         new ClusteredPositionDeleteWriter<>(
-            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, 
deleteGranularity);
 
     PositionDelete<InternalRow> positionDelete = PositionDelete.create();
     try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
@@ -391,7 +404,20 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
   @Benchmark
   @Threads(1)
-  public void writeUnpartitionedFanoutPositionDeleteWriter(Blackhole 
blackhole) throws IOException {
+  public void 
writeUnpartitionedFanoutPositionDeleteWriterPartitionGranularity(Blackhole 
blackhole)
+      throws IOException {
+    writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, 
DeleteGranularity.PARTITION);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void 
writeUnpartitionedFanoutPositionDeleteWriterFileGranularity(Blackhole blackhole)
+      throws IOException {
+    writeUnpartitionedFanoutPositionDeleteWriterPartition(blackhole, 
DeleteGranularity.FILE);
+  }
+
+  private void writeUnpartitionedFanoutPositionDeleteWriterPartition(
+      Blackhole blackhole, DeleteGranularity deleteGranularity) throws 
IOException {
     FileIO io = table().io();
 
     OutputFileFactory fileFactory = newFileFactory();
@@ -400,7 +426,7 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     FanoutPositionOnlyDeleteWriter<InternalRow> writer =
         new FanoutPositionOnlyDeleteWriter<>(
-            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, 
deleteGranularity);
 
     PositionDelete<InternalRow> positionDelete = PositionDelete.create();
     try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) 
{
@@ -417,8 +443,20 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
   @Benchmark
   @Threads(1)
-  public void writeUnpartitionedFanoutPositionDeleteWriterShuffled(Blackhole 
blackhole)
-      throws IOException {
+  public void 
writeUnpartitionedFanoutPositionDeleteWriterShuffledPartitionGranularity(
+      Blackhole blackhole) throws IOException {
+    writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, 
DeleteGranularity.PARTITION);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void 
writeUnpartitionedFanoutPositionDeleteWriterShuffledFileGranularity(
+      Blackhole blackhole) throws IOException {
+    writeUnpartitionedFanoutPositionDeleteWriterShuffled(blackhole, 
DeleteGranularity.FILE);
+  }
+
+  private void writeUnpartitionedFanoutPositionDeleteWriterShuffled(
+      Blackhole blackhole, DeleteGranularity deleteGranularity) throws 
IOException {
 
     FileIO io = table().io();
 
@@ -428,7 +466,7 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     FanoutPositionOnlyDeleteWriter<InternalRow> writer =
         new FanoutPositionOnlyDeleteWriter<>(
-            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+            writerFactory, fileFactory, io, TARGET_FILE_SIZE_IN_BYTES, 
deleteGranularity);
 
     PositionDelete<InternalRow> positionDelete = PositionDelete.create();
     try (FanoutPositionOnlyDeleteWriter<InternalRow> closeableWriter = writer) 
{
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 1454fc534e..07393a67fe 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.IsolationLevel;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.exceptions.ValidationException;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -708,4 +709,15 @@ public class SparkWriteConf {
   private double shuffleCompressionRatio(FileFormat outputFileFormat, String 
outputCodec) {
     return SparkCompressionUtil.shuffleCompressionRatio(spark, 
outputFileFormat, outputCodec);
   }
+
+  public DeleteGranularity deleteGranularity() {
+    String valueAsString =
+        confParser
+            .stringConf()
+            .option(SparkWriteOptions.DELETE_GRANULARITY)
+            .tableProperty(TableProperties.DELETE_GRANULARITY)
+            .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)
+            .parse();
+    return DeleteGranularity.fromString(valueAsString);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 48dfa44c91..d9c4f66b19 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -83,4 +83,7 @@ public class SparkWriteOptions {
 
   // Overrides the advisory partition size
   public static final String ADVISORY_PARTITION_SIZE = 
"advisory-partition-size";
+
+  // Overrides the delete granularity
+  public static final String DELETE_GRANULARITY = "delete-granularity";
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
index a397a069ee..d917794758 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.PositionDeletesTable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
 import org.apache.iceberg.io.FileIO;
@@ -70,6 +71,7 @@ public class SparkPositionDeletesRewrite implements Write {
   private final String queryId;
   private final FileFormat format;
   private final long targetFileSize;
+  private final DeleteGranularity deleteGranularity;
   private final Schema writeSchema;
   private final StructType dsSchema;
   private final String fileSetId;
@@ -103,6 +105,7 @@ public class SparkPositionDeletesRewrite implements Write {
     this.queryId = writeInfo.queryId();
     this.format = writeConf.deleteFileFormat();
     this.targetFileSize = writeConf.targetDeleteFileSize();
+    this.deleteGranularity = writeConf.deleteGranularity();
     this.writeSchema = writeSchema;
     this.dsSchema = dsSchema;
     this.fileSetId = writeConf.rewrittenFileSetId();
@@ -129,6 +132,7 @@ public class SparkPositionDeletesRewrite implements Write {
           queryId,
           format,
           targetFileSize,
+          deleteGranularity,
           writeSchema,
           dsSchema,
           specId,
@@ -179,6 +183,7 @@ public class SparkPositionDeletesRewrite implements Write {
     private final String queryId;
     private final FileFormat format;
     private final Long targetFileSize;
+    private final DeleteGranularity deleteGranularity;
     private final Schema writeSchema;
     private final StructType dsSchema;
     private final int specId;
@@ -190,6 +195,7 @@ public class SparkPositionDeletesRewrite implements Write {
         String queryId,
         FileFormat format,
         long targetFileSize,
+        DeleteGranularity deleteGranularity,
         Schema writeSchema,
         StructType dsSchema,
         int specId,
@@ -199,6 +205,7 @@ public class SparkPositionDeletesRewrite implements Write {
       this.queryId = queryId;
       this.format = format;
       this.targetFileSize = targetFileSize;
+      this.deleteGranularity = deleteGranularity;
       this.writeSchema = writeSchema;
       this.dsSchema = dsSchema;
       this.specId = specId;
@@ -241,6 +248,7 @@ public class SparkPositionDeletesRewrite implements Write {
           writerFactoryWithoutRow,
           deleteFileFactory,
           targetFileSize,
+          deleteGranularity,
           dsSchema,
           specId,
           partition);
@@ -289,6 +297,7 @@ public class SparkPositionDeletesRewrite implements Write {
     private final SparkFileWriterFactory writerFactoryWithoutRow;
     private final OutputFileFactory deleteFileFactory;
     private final long targetFileSize;
+    private final DeleteGranularity deleteGranularity;
     private final PositionDelete<InternalRow> positionDelete;
     private final FileIO io;
     private final PartitionSpec spec;
@@ -322,11 +331,13 @@ public class SparkPositionDeletesRewrite implements Write 
{
         SparkFileWriterFactory writerFactoryWithoutRow,
         OutputFileFactory deleteFileFactory,
         long targetFileSize,
+        DeleteGranularity deleteGranularity,
         StructType dsSchema,
         int specId,
         StructLike partition) {
       this.deleteFileFactory = deleteFileFactory;
       this.targetFileSize = targetFileSize;
+      this.deleteGranularity = deleteGranularity;
       this.writerFactoryWithRow = writerFactoryWithRow;
       this.writerFactoryWithoutRow = writerFactoryWithoutRow;
       this.positionDelete = PositionDelete.create();
@@ -387,7 +398,7 @@ public class SparkPositionDeletesRewrite implements Write {
       if (writerWithRow == null) {
         this.writerWithRow =
             new ClusteredPositionDeleteWriter<>(
-                writerFactoryWithRow, deleteFileFactory, io, targetFileSize);
+                writerFactoryWithRow, deleteFileFactory, io, targetFileSize, 
deleteGranularity);
       }
       return writerWithRow;
     }
@@ -396,7 +407,7 @@ public class SparkPositionDeletesRewrite implements Write {
       if (writerWithoutRow == null) {
         this.writerWithoutRow =
             new ClusteredPositionDeleteWriter<>(
-                writerFactoryWithoutRow, deleteFileFactory, io, 
targetFileSize);
+                writerFactoryWithoutRow, deleteFileFactory, io, 
targetFileSize, deleteGranularity);
       }
       return writerWithoutRow;
     }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index dcc949b290..022283631f 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.expressions.Expression;
@@ -441,11 +442,14 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       FileIO io = table.io();
       boolean inputOrdered = context.inputOrdered();
       long targetFileSize = context.targetDeleteFileSize();
+      DeleteGranularity deleteGranularity = context.deleteGranularity();
 
       if (inputOrdered) {
-        return new ClusteredPositionDeleteWriter<>(writers, files, io, 
targetFileSize);
+        return new ClusteredPositionDeleteWriter<>(
+            writers, files, io, targetFileSize, deleteGranularity);
       } else {
-        return new FanoutPositionOnlyDeleteWriter<>(writers, files, io, 
targetFileSize);
+        return new FanoutPositionOnlyDeleteWriter<>(
+            writers, files, io, targetFileSize, deleteGranularity);
       }
     }
   }
@@ -684,6 +688,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     private final StructType metadataSparkType;
     private final FileFormat deleteFileFormat;
     private final long targetDeleteFileSize;
+    private final DeleteGranularity deleteGranularity;
     private final String queryId;
     private final boolean useFanoutWriter;
     private final boolean inputOrdered;
@@ -700,6 +705,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       this.deleteSparkType = info.rowIdSchema().get();
       this.deleteFileFormat = writeConf.deleteFileFormat();
       this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
+      this.deleteGranularity = writeConf.deleteGranularity();
       this.metadataSparkType = info.metadataSchema().get();
       this.queryId = info.queryId();
       this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
@@ -734,6 +740,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       return targetDeleteFileSize;
     }
 
+    DeleteGranularity deleteGranularity() {
+      return deleteGranularity;
+    }
+
     String queryId() {
       return queryId;
     }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index abf40ebd95..7c3ecac676 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -45,6 +45,7 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.DEL
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE;
 import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
 import java.util.Map;
@@ -52,6 +53,7 @@ import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.internal.SQLConf;
@@ -76,6 +78,61 @@ public class TestSparkWriteConf extends 
SparkTestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testDeleteGranularityDefault() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+
+    DeleteGranularity value = writeConf.deleteGranularity();
+    assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
+  }
+
+  @Test
+  public void testDeleteGranularityTableProperty() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table
+        .updateProperties()
+        .set(TableProperties.DELETE_GRANULARITY, 
DeleteGranularity.FILE.toString())
+        .commit();
+
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+
+    DeleteGranularity value = writeConf.deleteGranularity();
+    assertThat(value).isEqualTo(DeleteGranularity.FILE);
+  }
+
+  @Test
+  public void testDeleteGranularityWriteOption() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table
+        .updateProperties()
+        .set(TableProperties.DELETE_GRANULARITY, 
DeleteGranularity.PARTITION.toString())
+        .commit();
+
+    Map<String, String> options =
+        ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, 
DeleteGranularity.FILE.toString());
+
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table, options);
+
+    DeleteGranularity value = writeConf.deleteGranularity();
+    assertThat(value).isEqualTo(DeleteGranularity.FILE);
+  }
+
+  @Test
+  public void testDeleteGranularityInvalidValue() {
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateProperties().set(TableProperties.DELETE_GRANULARITY, 
"invalid").commit();
+
+    SparkWriteConf writeConf = new SparkWriteConf(spark, table, 
ImmutableMap.of());
+
+    assertThatThrownBy(writeConf::deleteGranularity)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Unknown delete granularity");
+  }
+
   @Test
   public void testAdvisoryPartitionSize() {
     Table table = validationCatalog.loadTable(tableIdent);
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index d1e33950eb..7c55ff82df 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -57,6 +57,7 @@ import 
org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
 import org.apache.iceberg.actions.SizeBasedFileRewriter;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -131,6 +132,42 @@ public class TestRewritePositionDeleteFilesAction extends 
CatalogTestBase {
     assertThat(result.addedDeleteFilesCount()).as("No added delete 
files").isZero();
   }
 
+  @TestTemplate
+  public void testFileGranularity() throws Exception {
+    checkDeleteGranularity(DeleteGranularity.FILE);
+  }
+
+  @TestTemplate
+  public void testPartitionGranularity() throws Exception {
+    checkDeleteGranularity(DeleteGranularity.PARTITION);
+  }
+
+  private void checkDeleteGranularity(DeleteGranularity deleteGranularity) 
throws Exception {
+    Table table = createTableUnpartitioned(2, SCALE);
+
+    table
+        .updateProperties()
+        .set(TableProperties.DELETE_GRANULARITY, deleteGranularity.toString())
+        .commit();
+
+    List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+    assertThat(dataFiles).hasSize(2);
+
+    writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
+
+    List<DeleteFile> deleteFiles = deleteFiles(table);
+    assertThat(deleteFiles).hasSize(2);
+
+    Result result =
+        SparkActions.get(spark)
+            .rewritePositionDeletes(table)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .execute();
+
+    int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE 
? 2 : 1;
+    
assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount);
+  }
+
   @TestTemplate
   public void testUnpartitioned() throws Exception {
     Table table = createTableUnpartitioned(2, SCALE);

Reply via email to