This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e69418ae8d Spark 3.5: Extend action for rewriting manifests to support 
deletes (#9020)
e69418ae8d is described below

commit e69418ae8d6d041acae0af4fe59ea3c8b8f8705f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 17 18:34:25 2023 -0800

    Spark 3.5: Extend action for rewriting manifests to support deletes (#9020)
---
 .../{SparkDataFile.java => SparkContentFile.java}  |  75 +++--
 .../org/apache/iceberg/spark/SparkDataFile.java    | 192 +------------
 .../org/apache/iceberg/spark/SparkDeleteFile.java  |  40 +++
 .../spark/actions/RewriteManifestsSparkAction.java | 237 +++++++++++----
 .../spark/actions/TestRewriteManifestsAction.java  | 318 +++++++++++++++++++++
 .../iceberg/spark/source/TestSparkDataFile.java    | 194 ++++++++++---
 6 files changed, 747 insertions(+), 309 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
similarity index 79%
copy from 
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
copy to 
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index d8a3d5714e..3dd8049c13 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -21,7 +21,9 @@ package org.apache.iceberg.spark;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -31,8 +33,9 @@ import org.apache.iceberg.util.StructProjection;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
-public class SparkDataFile implements DataFile {
+public abstract class SparkContentFile<F> implements ContentFile<F> {
 
+  private final int fileContentPosition;
   private final int filePathPosition;
   private final int fileFormatPosition;
   private final int partitionPosition;
@@ -47,20 +50,16 @@ public class SparkDataFile implements DataFile {
   private final int keyMetadataPosition;
   private final int splitOffsetsPosition;
   private final int sortOrderIdPosition;
+  private final int equalityIdsPosition;
   private final Type lowerBoundsType;
   private final Type upperBoundsType;
   private final Type keyMetadataType;
 
   private final SparkStructLike wrappedPartition;
-  private final StructLike partitionProjection;
+  private final StructLike projectedPartition;
   private Row wrapped;
 
-  public SparkDataFile(Types.StructType type, StructType sparkType) {
-    this(type, null, sparkType);
-  }
-
-  public SparkDataFile(
-      Types.StructType type, Types.StructType projectedType, StructType 
sparkType) {
+  SparkContentFile(Types.StructType type, Types.StructType projectedType, 
StructType sparkType) {
     this.lowerBoundsType = type.fieldType(DataFile.LOWER_BOUNDS.name());
     this.upperBoundsType = type.fieldType(DataFile.UPPER_BOUNDS.name());
     this.keyMetadataType = type.fieldType(DataFile.KEY_METADATA.name());
@@ -71,20 +70,20 @@ public class SparkDataFile implements DataFile {
     if (projectedType != null) {
       Types.StructType projectedPartitionType =
           projectedType.fieldType(DataFile.PARTITION_NAME).asStructType();
-      this.partitionProjection =
-          StructProjection.create(partitionType, 
projectedPartitionType).wrap(wrappedPartition);
+      StructProjection partitionProjection =
+          StructProjection.create(partitionType, projectedPartitionType);
+      this.projectedPartition = partitionProjection.wrap(wrappedPartition);
     } else {
-      this.partitionProjection = wrappedPartition;
+      this.projectedPartition = wrappedPartition;
     }
 
     Map<String, Integer> positions = Maps.newHashMap();
-    type.fields()
-        .forEach(
-            field -> {
-              String fieldName = field.name();
-              positions.put(fieldName, fieldPosition(fieldName, sparkType));
-            });
+    for (Types.NestedField field : type.fields()) {
+      String fieldName = field.name();
+      positions.put(fieldName, fieldPosition(fieldName, sparkType));
+    }
 
+    this.fileContentPosition = positions.get(DataFile.CONTENT.name());
     this.filePathPosition = positions.get(DataFile.FILE_PATH.name());
     this.fileFormatPosition = positions.get(DataFile.FILE_FORMAT.name());
     this.partitionPosition = positions.get(DataFile.PARTITION_NAME);
@@ -99,16 +98,19 @@ public class SparkDataFile implements DataFile {
     this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
     this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
     this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+    this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
   }
 
-  public SparkDataFile wrap(Row row) {
+  public F wrap(Row row) {
     this.wrapped = row;
     if (wrappedPartition.size() > 0) {
-      this.wrappedPartition.wrap(row.getAs(partitionPosition));
+      wrappedPartition.wrap(row.getAs(partitionPosition));
     }
-    return this;
+    return asFile();
   }
 
+  protected abstract F asFile();
+
   @Override
   public Long pos() {
     return null;
@@ -119,6 +121,14 @@ public class SparkDataFile implements DataFile {
     return -1;
   }
 
+  @Override
+  public FileContent content() {
+    if (wrapped.isNullAt(fileContentPosition)) {
+      return null;
+    }
+    return FileContent.values()[wrapped.getInt(fileContentPosition)];
+  }
+
   @Override
   public CharSequence path() {
     return wrapped.getAs(filePathPosition);
@@ -131,7 +141,7 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public StructLike partition() {
-    return partitionProjection;
+    return projectedPartition;
   }
 
   @Override
@@ -156,16 +166,18 @@ public class SparkDataFile implements DataFile {
 
   @Override
   public Map<Integer, Long> nullValueCounts() {
-    return wrapped.isNullAt(nullValueCountsPosition)
-        ? null
-        : wrapped.getJavaMap(nullValueCountsPosition);
+    if (wrapped.isNullAt(nullValueCountsPosition)) {
+      return null;
+    }
+    return wrapped.getJavaMap(nullValueCountsPosition);
   }
 
   @Override
   public Map<Integer, Long> nanValueCounts() {
-    return wrapped.isNullAt(nanValueCountsPosition)
-        ? null
-        : wrapped.getJavaMap(nanValueCountsPosition);
+    if (wrapped.isNullAt(nanValueCountsPosition)) {
+      return null;
+    }
+    return wrapped.getJavaMap(nanValueCountsPosition);
   }
 
   @Override
@@ -188,12 +200,12 @@ public class SparkDataFile implements DataFile {
   }
 
   @Override
-  public DataFile copy() {
+  public F copy() {
     throw new UnsupportedOperationException("Not implemented: copy");
   }
 
   @Override
-  public DataFile copyWithoutStats() {
+  public F copyWithoutStats() {
     throw new UnsupportedOperationException("Not implemented: 
copyWithoutStats");
   }
 
@@ -207,6 +219,11 @@ public class SparkDataFile implements DataFile {
     return wrapped.getAs(sortOrderIdPosition);
   }
 
+  @Override
+  public List<Integer> equalityFieldIds() {
+    return wrapped.isNullAt(equalityIdsPosition) ? null : 
wrapped.getList(equalityIdsPosition);
+  }
+
   private int fieldPosition(String name, StructType sparkType) {
     try {
       return sparkType.fieldIndex(name);
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
index d8a3d5714e..543ebf3f9e 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
@@ -18,209 +18,29 @@
  */
 package org.apache.iceberg.spark;
 
-import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Map;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
 
-public class SparkDataFile implements DataFile {
-
-  private final int filePathPosition;
-  private final int fileFormatPosition;
-  private final int partitionPosition;
-  private final int recordCountPosition;
-  private final int fileSizeInBytesPosition;
-  private final int columnSizesPosition;
-  private final int valueCountsPosition;
-  private final int nullValueCountsPosition;
-  private final int nanValueCountsPosition;
-  private final int lowerBoundsPosition;
-  private final int upperBoundsPosition;
-  private final int keyMetadataPosition;
-  private final int splitOffsetsPosition;
-  private final int sortOrderIdPosition;
-  private final Type lowerBoundsType;
-  private final Type upperBoundsType;
-  private final Type keyMetadataType;
-
-  private final SparkStructLike wrappedPartition;
-  private final StructLike partitionProjection;
-  private Row wrapped;
+public class SparkDataFile extends SparkContentFile<DataFile> implements 
DataFile {
 
   public SparkDataFile(Types.StructType type, StructType sparkType) {
-    this(type, null, sparkType);
+    super(type, null, sparkType);
   }
 
   public SparkDataFile(
       Types.StructType type, Types.StructType projectedType, StructType 
sparkType) {
-    this.lowerBoundsType = type.fieldType(DataFile.LOWER_BOUNDS.name());
-    this.upperBoundsType = type.fieldType(DataFile.UPPER_BOUNDS.name());
-    this.keyMetadataType = type.fieldType(DataFile.KEY_METADATA.name());
-
-    Types.StructType partitionType = 
type.fieldType(DataFile.PARTITION_NAME).asStructType();
-    this.wrappedPartition = new SparkStructLike(partitionType);
-
-    if (projectedType != null) {
-      Types.StructType projectedPartitionType =
-          projectedType.fieldType(DataFile.PARTITION_NAME).asStructType();
-      this.partitionProjection =
-          StructProjection.create(partitionType, 
projectedPartitionType).wrap(wrappedPartition);
-    } else {
-      this.partitionProjection = wrappedPartition;
-    }
-
-    Map<String, Integer> positions = Maps.newHashMap();
-    type.fields()
-        .forEach(
-            field -> {
-              String fieldName = field.name();
-              positions.put(fieldName, fieldPosition(fieldName, sparkType));
-            });
-
-    this.filePathPosition = positions.get(DataFile.FILE_PATH.name());
-    this.fileFormatPosition = positions.get(DataFile.FILE_FORMAT.name());
-    this.partitionPosition = positions.get(DataFile.PARTITION_NAME);
-    this.recordCountPosition = positions.get(DataFile.RECORD_COUNT.name());
-    this.fileSizeInBytesPosition = positions.get(DataFile.FILE_SIZE.name());
-    this.columnSizesPosition = positions.get(DataFile.COLUMN_SIZES.name());
-    this.valueCountsPosition = positions.get(DataFile.VALUE_COUNTS.name());
-    this.nullValueCountsPosition = 
positions.get(DataFile.NULL_VALUE_COUNTS.name());
-    this.nanValueCountsPosition = 
positions.get(DataFile.NAN_VALUE_COUNTS.name());
-    this.lowerBoundsPosition = positions.get(DataFile.LOWER_BOUNDS.name());
-    this.upperBoundsPosition = positions.get(DataFile.UPPER_BOUNDS.name());
-    this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
-    this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
-    this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+    super(type, projectedType, sparkType);
   }
 
-  public SparkDataFile wrap(Row row) {
-    this.wrapped = row;
-    if (wrappedPartition.size() > 0) {
-      this.wrappedPartition.wrap(row.getAs(partitionPosition));
-    }
+  @Override
+  protected DataFile asFile() {
     return this;
   }
 
   @Override
-  public Long pos() {
+  public List<Integer> equalityFieldIds() {
     return null;
   }
-
-  @Override
-  public int specId() {
-    return -1;
-  }
-
-  @Override
-  public CharSequence path() {
-    return wrapped.getAs(filePathPosition);
-  }
-
-  @Override
-  public FileFormat format() {
-    return FileFormat.fromString(wrapped.getString(fileFormatPosition));
-  }
-
-  @Override
-  public StructLike partition() {
-    return partitionProjection;
-  }
-
-  @Override
-  public long recordCount() {
-    return wrapped.getAs(recordCountPosition);
-  }
-
-  @Override
-  public long fileSizeInBytes() {
-    return wrapped.getAs(fileSizeInBytesPosition);
-  }
-
-  @Override
-  public Map<Integer, Long> columnSizes() {
-    return wrapped.isNullAt(columnSizesPosition) ? null : 
wrapped.getJavaMap(columnSizesPosition);
-  }
-
-  @Override
-  public Map<Integer, Long> valueCounts() {
-    return wrapped.isNullAt(valueCountsPosition) ? null : 
wrapped.getJavaMap(valueCountsPosition);
-  }
-
-  @Override
-  public Map<Integer, Long> nullValueCounts() {
-    return wrapped.isNullAt(nullValueCountsPosition)
-        ? null
-        : wrapped.getJavaMap(nullValueCountsPosition);
-  }
-
-  @Override
-  public Map<Integer, Long> nanValueCounts() {
-    return wrapped.isNullAt(nanValueCountsPosition)
-        ? null
-        : wrapped.getJavaMap(nanValueCountsPosition);
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> lowerBounds() {
-    Map<?, ?> lowerBounds =
-        wrapped.isNullAt(lowerBoundsPosition) ? null : 
wrapped.getJavaMap(lowerBoundsPosition);
-    return convert(lowerBoundsType, lowerBounds);
-  }
-
-  @Override
-  public Map<Integer, ByteBuffer> upperBounds() {
-    Map<?, ?> upperBounds =
-        wrapped.isNullAt(upperBoundsPosition) ? null : 
wrapped.getJavaMap(upperBoundsPosition);
-    return convert(upperBoundsType, upperBounds);
-  }
-
-  @Override
-  public ByteBuffer keyMetadata() {
-    return convert(keyMetadataType, wrapped.get(keyMetadataPosition));
-  }
-
-  @Override
-  public DataFile copy() {
-    throw new UnsupportedOperationException("Not implemented: copy");
-  }
-
-  @Override
-  public DataFile copyWithoutStats() {
-    throw new UnsupportedOperationException("Not implemented: 
copyWithoutStats");
-  }
-
-  @Override
-  public List<Long> splitOffsets() {
-    return wrapped.isNullAt(splitOffsetsPosition) ? null : 
wrapped.getList(splitOffsetsPosition);
-  }
-
-  @Override
-  public Integer sortOrderId() {
-    return wrapped.getAs(sortOrderIdPosition);
-  }
-
-  private int fieldPosition(String name, StructType sparkType) {
-    try {
-      return sparkType.fieldIndex(name);
-    } catch (IllegalArgumentException e) {
-      // the partition field is absent for unpartitioned tables
-      if (name.equals(DataFile.PARTITION_NAME) && wrappedPartition.size() == 
0) {
-        return -1;
-      }
-      throw e;
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private <T> T convert(Type valueType, Object value) {
-    return (T) SparkValueConverter.convert(valueType, value);
-  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java
new file mode 100644
index 0000000000..6250a16306
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDeleteFile.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.StructType;
+
+public class SparkDeleteFile extends SparkContentFile<DeleteFile> implements 
DeleteFile {
+
+  public SparkDeleteFile(Types.StructType type, StructType sparkType) {
+    super(type, null, sparkType);
+  }
+
+  public SparkDeleteFile(
+      Types.StructType type, Types.StructType projectedType, StructType 
sparkType) {
+    super(type, projectedType, sparkType);
+  }
+
+  @Override
+  protected DeleteFile asFile() {
+    return this;
+  }
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 0c08324a1a..5b1d616569 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -21,15 +21,19 @@ package org.apache.iceberg.spark.actions;
 import static org.apache.iceberg.MetadataTableType.ENTRIES;
 
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
@@ -50,7 +54,9 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkContentFile;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkDeleteFile;
 import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
@@ -85,8 +91,12 @@ public class RewriteManifestsSparkAction
   public static final boolean USE_CACHING_DEFAULT = false;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RewriteManifestsSparkAction.class);
+  private static final RewriteManifests.Result EMPTY_RESULT =
+      ImmutableRewriteManifests.Result.builder()
+          .rewrittenManifests(ImmutableList.of())
+          .addedManifests(ImmutableList.of())
+          .build();
 
-  private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
   private final int formatVersion;
   private final long targetManifestSizeBytes;
@@ -98,7 +108,6 @@ public class RewriteManifestsSparkAction
 
   RewriteManifestsSparkAction(SparkSession spark, Table table) {
     super(spark);
-    this.manifestEncoder = Encoders.javaSerialization(ManifestFile.class);
     this.table = table;
     this.spec = table.spec();
     this.targetManifestSizeBytes =
@@ -159,34 +168,49 @@ public class RewriteManifestsSparkAction
   }
 
   private RewriteManifests.Result doExecute() {
-    List<ManifestFile> matchingManifests = findMatchingManifests();
+    List<ManifestFile> rewrittenManifests = Lists.newArrayList();
+    List<ManifestFile> addedManifests = Lists.newArrayList();
+
+    RewriteManifests.Result dataResult = 
rewriteManifests(ManifestContent.DATA);
+    Iterables.addAll(rewrittenManifests, dataResult.rewrittenManifests());
+    Iterables.addAll(addedManifests, dataResult.addedManifests());
+
+    RewriteManifests.Result deletesResult = 
rewriteManifests(ManifestContent.DELETES);
+    Iterables.addAll(rewrittenManifests, deletesResult.rewrittenManifests());
+    Iterables.addAll(addedManifests, deletesResult.addedManifests());
+
+    if (rewrittenManifests.isEmpty()) {
+      return EMPTY_RESULT;
+    }
+
+    replaceManifests(rewrittenManifests, addedManifests);
+
+    return ImmutableRewriteManifests.Result.builder()
+        .rewrittenManifests(rewrittenManifests)
+        .addedManifests(addedManifests)
+        .build();
+  }
+
+  private RewriteManifests.Result rewriteManifests(ManifestContent content) {
+    List<ManifestFile> matchingManifests = findMatchingManifests(content);
     if (matchingManifests.isEmpty()) {
-      return ImmutableRewriteManifests.Result.builder()
-          .addedManifests(ImmutableList.of())
-          .rewrittenManifests(ImmutableList.of())
-          .build();
+      return EMPTY_RESULT;
     }
 
     int targetNumManifests = 
targetNumManifests(totalSizeBytes(matchingManifests));
-
     if (targetNumManifests == 1 && matchingManifests.size() == 1) {
-      return ImmutableRewriteManifests.Result.builder()
-          .addedManifests(ImmutableList.of())
-          .rewrittenManifests(ImmutableList.of())
-          .build();
+      return EMPTY_RESULT;
     }
 
     Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
 
     List<ManifestFile> newManifests;
     if (spec.isUnpartitioned()) {
-      newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, 
targetNumManifests);
+      newManifests = writeUnpartitionedManifests(content, manifestEntryDF, 
targetNumManifests);
     } else {
-      newManifests = writeManifestsForPartitionedTable(manifestEntryDF, 
targetNumManifests);
+      newManifests = writePartitionedManifests(content, manifestEntryDF, 
targetNumManifests);
     }
 
-    replaceManifests(matchingManifests, newManifests);
-
     return ImmutableRewriteManifests.Result.builder()
         .rewrittenManifests(matchingManifests)
         .addedManifests(newManifests)
@@ -215,41 +239,45 @@ public class RewriteManifestsSparkAction
         .select("snapshot_id", "sequence_number", "file_sequence_number", 
"data_file");
   }
 
-  private List<ManifestFile> writeManifestsForUnpartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests) {
+  private List<ManifestFile> writeUnpartitionedManifests(
+      ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) 
{
 
-    StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
-    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-    Types.StructType partitionType = spec.partitionType();
-
-    return manifestEntryDF
-        .repartition(numManifests)
-        .mapPartitions(
-            toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
-            manifestEncoder)
-        .collectAsList();
+    WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
manifestEntryDF.schema());
+    Dataset<Row> transformedManifestEntryDF = 
manifestEntryDF.repartition(numManifests);
+    return writeFunc.apply(transformedManifestEntryDF).collectAsList();
   }
 
-  private List<ManifestFile> writeManifestsForPartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests) {
-
-    StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
-    Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-    Types.StructType partitionType = spec.partitionType();
+  private List<ManifestFile> writePartitionedManifests(
+      ManifestContent content, Dataset<Row> manifestEntryDF, int numManifests) 
{
 
     return withReusableDS(
         manifestEntryDF,
         df -> {
+          WriteManifests<?> writeFunc = newWriteManifestsFunc(content, 
df.schema());
           Column partitionColumn = df.col("data_file.partition");
-          return df.repartitionByRange(numManifests, partitionColumn)
-              .sortWithinPartitions(partitionColumn)
-              .mapPartitions(
-                  toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
-                  manifestEncoder)
-              .collectAsList();
+          Dataset<Row> transformedDF = repartitionAndSort(df, partitionColumn, 
numManifests);
+          return writeFunc.apply(transformedDF).collectAsList();
         });
   }
 
+  private WriteManifests<?> newWriteManifestsFunc(ManifestContent content, 
StructType sparkType) {
+    ManifestWriterFactory writers = manifestWriters();
+
+    StructType sparkFileType = (StructType) 
sparkType.apply("data_file").dataType();
+    Types.StructType combinedFileType = 
DataFile.getType(Partitioning.partitionType(table));
+    Types.StructType fileType = DataFile.getType(spec.partitionType());
+
+    if (content == ManifestContent.DATA) {
+      return new WriteDataManifests(writers, combinedFileType, fileType, 
sparkFileType);
+    } else {
+      return new WriteDeleteManifests(writers, combinedFileType, fileType, 
sparkFileType);
+    }
+  }
+
+  private Dataset<Row> repartitionAndSort(Dataset<Row> df, Column col, int 
numPartitions) {
+    return df.repartitionByRange(numPartitions, col).sortWithinPartitions(col);
+  }
+
   private <T, U> U withReusableDS(Dataset<T> ds, Function<Dataset<T>, U> func) 
{
     boolean useCaching =
         PropertyUtil.propertyAsBoolean(options(), USE_CACHING, 
USE_CACHING_DEFAULT);
@@ -264,18 +292,31 @@ public class RewriteManifestsSparkAction
     }
   }
 
-  private List<ManifestFile> findMatchingManifests() {
+  private List<ManifestFile> findMatchingManifests(ManifestContent content) {
     Snapshot currentSnapshot = table.currentSnapshot();
 
     if (currentSnapshot == null) {
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests(table.io()).stream()
+    List<ManifestFile> manifests = loadManifests(content, currentSnapshot);
+
+    return manifests.stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && 
predicate.test(manifest))
         .collect(Collectors.toList());
   }
 
+  private List<ManifestFile> loadManifests(ManifestContent content, Snapshot 
snapshot) {
+    switch (content) {
+      case DATA:
+        return snapshot.dataManifests(table.io());
+      case DELETES:
+        return snapshot.deleteManifests(table.io());
+      default:
+        throw new IllegalArgumentException("Unknown manifest content: " + 
content);
+    }
+  }
+
   private int targetNumManifests(long totalSizeBytes) {
     return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / 
targetManifestSizeBytes);
   }
@@ -339,18 +380,82 @@ public class RewriteManifestsSparkAction
         (long) (1.2 * targetManifestSizeBytes));
   }
 
-  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      ManifestWriterFactory writers,
-      Types.StructType combinedPartitionType,
-      Types.StructType partitionType,
-      StructType sparkType) {
+  private static class WriteDataManifests extends WriteManifests<DataFile> {
+
+    WriteDataManifests(
+        ManifestWriterFactory manifestWriters,
+        Types.StructType combinedPartitionType,
+        Types.StructType partitionType,
+        StructType sparkFileType) {
+      super(manifestWriters, combinedPartitionType, partitionType, 
sparkFileType);
+    }
+
+    @Override
+    protected SparkDataFile newFileWrapper() {
+      return new SparkDataFile(combinedFileType(), fileType(), 
sparkFileType());
+    }
+
+    @Override
+    protected RollingManifestWriter<DataFile> newManifestWriter() {
+      return writers().newRollingManifestWriter();
+    }
+  }
+
+  private static class WriteDeleteManifests extends WriteManifests<DeleteFile> 
{
+
+    WriteDeleteManifests(
+        ManifestWriterFactory manifestWriters,
+        Types.StructType combinedFileType,
+        Types.StructType fileType,
+        StructType sparkFileType) {
+      super(manifestWriters, combinedFileType, fileType, sparkFileType);
+    }
+
+    @Override
+    protected SparkDeleteFile newFileWrapper() {
+      return new SparkDeleteFile(combinedFileType(), fileType(), 
sparkFileType());
+    }
+
+    @Override
+    protected RollingManifestWriter<DeleteFile> newManifestWriter() {
+      return writers().newRollingDeleteManifestWriter();
+    }
+  }
+
+  private abstract static class WriteManifests<F extends ContentFile<F>>
+      implements MapPartitionsFunction<Row, ManifestFile> {
+
+    private static final Encoder<ManifestFile> MANIFEST_ENCODER =
+        Encoders.javaSerialization(ManifestFile.class);
+
+    private final ManifestWriterFactory writers;
+    private final Types.StructType combinedFileType;
+    private final Types.StructType fileType;
+    private final StructType sparkFileType;
+
+    WriteManifests(
+        ManifestWriterFactory writers,
+        Types.StructType combinedFileType,
+        Types.StructType fileType,
+        StructType sparkFileType) {
+      this.writers = writers;
+      this.combinedFileType = combinedFileType;
+      this.fileType = fileType;
+      this.sparkFileType = sparkFileType;
+    }
+
+    protected abstract SparkContentFile<F> newFileWrapper();
 
-    return rows -> {
-      Types.StructType combinedFileType = 
DataFile.getType(combinedPartitionType);
-      Types.StructType manifestFileType = DataFile.getType(partitionType);
-      SparkDataFile wrapper = new SparkDataFile(combinedFileType, 
manifestFileType, sparkType);
+    protected abstract RollingManifestWriter<F> newManifestWriter();
 
-      RollingManifestWriter<DataFile> writer = 
writers.newRollingManifestWriter();
+    public Dataset<ManifestFile> apply(Dataset<Row> input) {
+      return input.mapPartitions(this, MANIFEST_ENCODER);
+    }
+
+    @Override
+    public Iterator<ManifestFile> call(Iterator<Row> rows) throws Exception {
+      SparkContentFile<F> fileWrapper = newFileWrapper();
+      RollingManifestWriter<F> writer = newManifestWriter();
 
       try {
         while (rows.hasNext()) {
@@ -359,14 +464,30 @@ public class RewriteManifestsSparkAction
           long sequenceNumber = row.getLong(1);
           Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
           Row file = row.getStruct(3);
-          writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, 
fileSequenceNumber);
+          writer.existing(fileWrapper.wrap(file), snapshotId, sequenceNumber, 
fileSequenceNumber);
         }
       } finally {
         writer.close();
       }
 
       return writer.toManifestFiles().iterator();
-    };
+    }
+
+    protected ManifestWriterFactory writers() {
+      return writers;
+    }
+
+    protected Types.StructType combinedFileType() {
+      return combinedFileType;
+    }
+
+    protected Types.StructType fileType() {
+      return fileType;
+    }
+
+    protected StructType sparkFileType() {
+      return sparkFileType;
+    }
   }
 
   private static class ManifestWriterFactory implements Serializable {
@@ -397,6 +518,14 @@ public class RewriteManifestsSparkAction
       return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null);
     }
 
+    public RollingManifestWriter<DeleteFile> newRollingDeleteManifestWriter() {
+      return new RollingManifestWriter<>(this::newDeleteManifestWriter, 
maxManifestSizeBytes);
+    }
+
+    private ManifestWriter<DeleteFile> newDeleteManifestWriter() {
+      return ManifestFiles.writeDeleteManifest(formatVersion, spec(), 
newOutputFile(), null);
+    }
+
     private PartitionSpec spec() {
       return table().specs().get(specId);
     }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index d3932c2e82..e7326c73e8 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -38,10 +38,15 @@ import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
@@ -49,6 +54,9 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.OutputFile;
@@ -61,6 +69,8 @@ import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -649,6 +659,255 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
   }
 
+  @Test
+  public void testRewriteSmallDeleteManifestsNonPartitionedTable() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    // commit data records
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(
+            new ThreeColumnRecord(1, null, "AAAA"),
+            new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"),
+            new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"),
+            new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"));
+    writeRecords(records);
+
+    // commit a position delete file to remove records where c1 = 1 OR c1 = 2
+    List<Pair<CharSequence, Long>> posDeletes = generatePosDeletes("c1 = 1 OR 
c1 = 2");
+    Pair<DeleteFile, CharSequenceSet> posDeleteWriteResult = 
writePosDeletes(table, posDeletes);
+    table
+        .newRowDelta()
+        .addDeletes(posDeleteWriteResult.first())
+        .validateDataFilesExist(posDeleteWriteResult.second())
+        .commit();
+
+    // commit an equality delete file to remove all records where c1 = 3
+    DeleteFile eqDeleteFile = writeEqDeletes(table, "c1", 3);
+    table.newRowDelta().addDeletes(eqDeleteFile).commit();
+
+    // the current snapshot should contain 1 data manifest and 2 delete 
manifests
+    List<ManifestFile> originalManifests = 
table.currentSnapshot().allManifests(table.io());
+    assertThat(originalManifests).hasSize(3);
+
+    SparkActions actions = SparkActions.get();
+
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
+
+    // the original delete manifests must be combined
+    assertThat(result.rewrittenManifests())
+        .hasSize(2)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertThat(result.addedManifests())
+        .hasSize(1)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertManifestsLocation(result.addedManifests());
+
+    // the new delete manifest must only contain files with status EXISTING
+    ManifestFile deleteManifest =
+        
Iterables.getOnlyElement(table.currentSnapshot().deleteManifests(table.io()));
+    assertThat(deleteManifest.existingFilesCount()).isEqualTo(2);
+    assertThat(deleteManifest.hasAddedFiles()).isFalse();
+    assertThat(deleteManifest.hasDeletedFiles()).isFalse();
+
+    // the preserved data manifest must only contain files with status ADDED
+    ManifestFile dataManifest =
+        
Iterables.getOnlyElement(table.currentSnapshot().dataManifests(table.io()));
+    assertThat(dataManifest.hasExistingFiles()).isFalse();
+    assertThat(dataManifest.hasAddedFiles()).isTrue();
+    assertThat(dataManifest.hasDeletedFiles()).isFalse();
+
+    // the table must produce expected records after the rewrite
+    List<ThreeColumnRecord> expectedRecords =
+        Lists.newArrayList(new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"));
+    assertThat(actualRecords()).isEqualTo(expectedRecords);
+  }
+
+  @Test
+  public void testRewriteSmallDeleteManifestsPartitionedTable() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    options.put(TableProperties.MANIFEST_MERGE_ENABLED, "false");
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    // commit data records
+    List<ThreeColumnRecord> records =
+        Lists.newArrayList(
+            new ThreeColumnRecord(1, null, "AAAA"),
+            new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"),
+            new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"),
+            new ThreeColumnRecord(4, "DDDDDDDDDD", "DDDD"),
+            new ThreeColumnRecord(5, "EEEEEEEEEE", "EEEE"));
+    writeRecords(records);
+
+    // commit the first position delete file to remove records where c1 = 1
+    List<Pair<CharSequence, Long>> posDeletes1 = generatePosDeletes("c1 = 1");
+    Pair<DeleteFile, CharSequenceSet> posDeleteWriteResult1 =
+        writePosDeletes(table, TestHelpers.Row.of("AAAA"), posDeletes1);
+    table
+        .newRowDelta()
+        .addDeletes(posDeleteWriteResult1.first())
+        .validateDataFilesExist(posDeleteWriteResult1.second())
+        .commit();
+
+    // commit the second position delete file to remove records where c1 = 2
+    List<Pair<CharSequence, Long>> posDeletes2 = generatePosDeletes("c1 = 2");
+    Pair<DeleteFile, CharSequenceSet> positionDeleteWriteResult2 =
+        writePosDeletes(table, TestHelpers.Row.of("BBBB"), posDeletes2);
+    table
+        .newRowDelta()
+        .addDeletes(positionDeleteWriteResult2.first())
+        .validateDataFilesExist(positionDeleteWriteResult2.second())
+        .commit();
+
+    // commit the first equality delete file to remove records where c1 = 3
+    DeleteFile eqDeleteFile1 = writeEqDeletes(table, 
TestHelpers.Row.of("CCCC"), "c1", 3);
+    table.newRowDelta().addDeletes(eqDeleteFile1).commit();
+
+    // commit the second equality delete file to remove records where c1 = 4
+    DeleteFile eqDeleteFile2 = writeEqDeletes(table, 
TestHelpers.Row.of("DDDD"), "c1", 4);
+    table.newRowDelta().addDeletes(eqDeleteFile2).commit();
+
+    // the table must have 1 data manifest and 4 delete manifests
+    List<ManifestFile> originalManifests = 
table.currentSnapshot().allManifests(table.io());
+    assertThat(originalManifests).hasSize(5);
+
+    // set the target manifest size to have 2 manifests with 2 entries in each 
after the rewrite
+    List<ManifestFile> originalDeleteManifests =
+        table.currentSnapshot().deleteManifests(table.io());
+    long manifestEntrySizeBytes = 
computeManifestEntrySizeBytes(originalDeleteManifests);
+    long targetManifestSizeBytes = (long) (1.05 * 2 * manifestEntrySizeBytes);
+
+    table
+        .updateProperties()
+        .set(TableProperties.MANIFEST_TARGET_SIZE_BYTES, 
String.valueOf(targetManifestSizeBytes))
+        .commit();
+
+    SparkActions actions = SparkActions.get();
+
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> manifest.content() == 
ManifestContent.DELETES)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
+
+    // the original 4 delete manifests must be replaced with 2 new delete 
manifests
+    assertThat(result.rewrittenManifests())
+        .hasSize(4)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertThat(result.addedManifests())
+        .hasSize(2)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertManifestsLocation(result.addedManifests());
+
+    List<ManifestFile> deleteManifests = 
table.currentSnapshot().deleteManifests(table.io());
+    assertThat(deleteManifests).hasSize(2);
+
+    // the first new delete manifest must only contain files with status 
EXISTING
+    ManifestFile deleteManifest1 = deleteManifests.get(0);
+    assertThat(deleteManifest1.existingFilesCount()).isEqualTo(2);
+    assertThat(deleteManifest1.hasAddedFiles()).isFalse();
+    assertThat(deleteManifest1.hasDeletedFiles()).isFalse();
+
+    // the second new delete manifest must only contain files with status 
EXISTING
+    ManifestFile deleteManifest2 = deleteManifests.get(1);
+    assertThat(deleteManifest2.existingFilesCount()).isEqualTo(2);
+    assertThat(deleteManifest2.hasAddedFiles()).isFalse();
+    assertThat(deleteManifest2.hasDeletedFiles()).isFalse();
+
+    // the table must produce expected records after the rewrite
+    List<ThreeColumnRecord> expectedRecords =
+        Lists.newArrayList(new ThreeColumnRecord(5, "EEEEEEEEEE", "EEEE"));
+    assertThat(actualRecords()).isEqualTo(expectedRecords);
+  }
+
+  @Test
+  public void testRewriteLargeDeleteManifestsPartitionedTable() throws 
IOException {
+    assumeThat(formatVersion).isGreaterThan(1);
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    // generate enough delete files to have a reasonably sized manifest
+    List<DeleteFile> deleteFiles = Lists.newArrayList();
+    for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+      DeleteFile deleteFile = newDeleteFile(table, "c3=" + fileOrdinal);
+      deleteFiles.add(deleteFile);
+    }
+
+    // commit delete files
+    RowDelta rowDelta = table.newRowDelta();
+    for (DeleteFile deleteFile : deleteFiles) {
+      rowDelta.addDeletes(deleteFile);
+    }
+    rowDelta.commit();
+
+    // the current snapshot should contain only 1 delete manifest
+    List<ManifestFile> originalDeleteManifests =
+        table.currentSnapshot().deleteManifests(table.io());
+    ManifestFile originalDeleteManifest = 
Iterables.getOnlyElement(originalDeleteManifests);
+
+    // set the target manifest size to a small value to force splitting 
records into multiple files
+    table
+        .updateProperties()
+        .set(
+            TableProperties.MANIFEST_TARGET_SIZE_BYTES,
+            String.valueOf(originalDeleteManifest.length() / 2))
+        .commit();
+
+    SparkActions actions = SparkActions.get();
+
+    String stagingLocation = temp.newFolder().toString();
+
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> true)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .stagingLocation(stagingLocation)
+            .execute();
+
+    // the action must rewrite the original delete manifest and add at least 2 
new ones
+    assertThat(result.rewrittenManifests())
+        .hasSize(1)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertThat(result.addedManifests())
+        .hasSizeGreaterThanOrEqualTo(2)
+        .allMatch(m -> m.content() == ManifestContent.DELETES);
+    assertManifestsLocation(result.addedManifests(), stagingLocation);
+
+    // the current snapshot must return the correct number of delete manifests
+    List<ManifestFile> deleteManifests = 
table.currentSnapshot().deleteManifests(table.io());
+    assertThat(deleteManifests).hasSizeGreaterThanOrEqualTo(2);
+  }
+
+  private List<ThreeColumnRecord> actualRecords() {
+    return spark
+        .read()
+        .format("iceberg")
+        .load(tableLocation)
+        .as(Encoders.bean(ThreeColumnRecord.class))
+        .sort("c1", "c2", "c3")
+        .collectAsList();
+  }
+
   private void writeRecords(List<ThreeColumnRecord> records) {
     Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
     writeDF(df);
@@ -721,4 +980,63 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
         .withFileSizeInBytes(10)
         .withRecordCount(1);
   }
+
+  private DeleteFile newDeleteFile(Table table, String partitionPath) {
+    return FileMetadata.deleteFileBuilder(table.spec())
+        .ofPositionDeletes()
+        .withPath("/path/to/pos-deletes-" + UUID.randomUUID() + ".parquet")
+        .withFileSizeInBytes(5)
+        .withPartitionPath(partitionPath)
+        .withRecordCount(1)
+        .build();
+  }
+
+  private List<Pair<CharSequence, Long>> generatePosDeletes(String predicate) {
+    List<Row> rows =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation)
+            .selectExpr("_file", "_pos")
+            .where(predicate)
+            .collectAsList();
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList();
+
+    for (Row row : rows) {
+      deletes.add(Pair.of(row.getString(0), row.getLong(1)));
+    }
+
+    return deletes;
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+      Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+    return writePosDeletes(table, null, deletes);
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+      Table table, StructLike partition, List<Pair<CharSequence, Long>> 
deletes)
+      throws IOException {
+    OutputFile outputFile = Files.localOutput(temp.newFile());
+    return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes);
+  }
+
+  private DeleteFile writeEqDeletes(Table table, String key, Object... values) 
throws IOException {
+    return writeEqDeletes(table, null, key, values);
+  }
+
+  private DeleteFile writeEqDeletes(Table table, StructLike partition, String 
key, Object... values)
+      throws IOException {
+    List<Record> deletes = Lists.newArrayList();
+    Schema deleteSchema = table.schema().select(key);
+    Record delete = GenericRecord.create(deleteSchema);
+
+    for (Object value : values) {
+      deletes.add(delete.copy(key, value));
+    }
+
+    OutputFile outputFile = Files.localOutput(temp.newFile());
+    return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes, 
deleteSchema);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index 1f8227c94e..b894d32326 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -20,32 +20,46 @@ package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.SparkDeleteFile;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -57,7 +71,6 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -133,13 +146,13 @@ public class TestSparkDataFile {
   public void testValueConversion() throws IOException {
     Table table =
         TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), tableLocation);
-    checkSparkDataFile(table);
+    checkSparkContentFiles(table);
   }
 
   @Test
   public void testValueConversionPartitionedTable() throws IOException {
     Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
-    checkSparkDataFile(table);
+    checkSparkContentFiles(table);
   }
 
   @Test
@@ -147,10 +160,10 @@ public class TestSparkDataFile {
     Map<String, String> props = Maps.newHashMap();
     props.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "none");
     Table table = TABLES.create(SCHEMA, SPEC, props, tableLocation);
-    checkSparkDataFile(table);
+    checkSparkContentFiles(table);
   }
 
-  private void checkSparkDataFile(Table table) throws IOException {
+  private void checkSparkContentFiles(Table table) throws IOException {
     Iterable<InternalRow> rows = RandomData.generateSpark(table.schema(), 200, 
0);
     JavaRDD<InternalRow> rdd = 
sparkContext.parallelize(Lists.newArrayList(rows));
     Dataset<Row> df =
@@ -161,66 +174,167 @@ public class TestSparkDataFile {
 
     table.refresh();
 
+    PartitionSpec dataFilesSpec = table.spec();
+
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
+    assertThat(manifests).hasSize(1);
 
     List<DataFile> dataFiles = Lists.newArrayList();
     try (ManifestReader<DataFile> reader = 
ManifestFiles.read(manifests.get(0), table.io())) {
       for (DataFile dataFile : reader) {
-        checkDataFile(dataFile.copy(), 
DataFiles.builder(table.spec()).copy(dataFile).build());
+        checkDataFile(dataFile.copy(), 
DataFiles.builder(dataFilesSpec).copy(dataFile).build());
         dataFiles.add(dataFile.copy());
       }
     }
 
-    Dataset<Row> dataFileDF = 
spark.read().format("iceberg").load(tableLocation + "#files");
+    UpdatePartitionSpec updateSpec = table.updateSpec();
+    for (PartitionField field : dataFilesSpec.fields()) {
+      updateSpec.removeField(field.name());
+    }
+    updateSpec.commit();
 
-    // reorder columns to test arbitrary projections
-    List<Column> columns =
-        
Arrays.stream(dataFileDF.columns()).map(ColumnName::new).collect(Collectors.toList());
-    Collections.shuffle(columns);
+    List<DeleteFile> positionDeleteFiles = Lists.newArrayList();
+    List<DeleteFile> equalityDeleteFiles = Lists.newArrayList();
+
+    RowDelta rowDelta = table.newRowDelta();
+
+    for (DataFile dataFile : dataFiles) {
+      DeleteFile positionDeleteFile = createPositionDeleteFile(table, 
dataFile);
+      positionDeleteFiles.add(positionDeleteFile);
+      rowDelta.addDeletes(positionDeleteFile);
+    }
+
+    DeleteFile equalityDeleteFile1 = createEqualityDeleteFile(table);
+    equalityDeleteFiles.add(equalityDeleteFile1);
+    rowDelta.addDeletes(equalityDeleteFile1);
 
-    List<Row> sparkDataFiles =
-        dataFileDF.select(Iterables.toArray(columns, 
Column.class)).collectAsList();
+    DeleteFile equalityDeleteFile2 = createEqualityDeleteFile(table);
+    equalityDeleteFiles.add(equalityDeleteFile2);
+    rowDelta.addDeletes(equalityDeleteFile2);
 
-    Assert.assertEquals(
-        "The number of files should match", dataFiles.size(), 
sparkDataFiles.size());
+    rowDelta.commit();
 
-    Types.StructType dataFileType = 
DataFile.getType(table.spec().partitionType());
+    Dataset<Row> dataFileDF = 
spark.read().format("iceberg").load(tableLocation + "#data_files");
+    List<Row> sparkDataFiles = shuffleColumns(dataFileDF).collectAsList();
+    assertThat(sparkDataFiles).hasSameSizeAs(dataFiles);
+
+    Types.StructType dataFileType = 
DataFile.getType(dataFilesSpec.partitionType());
     StructType sparkDataFileType = sparkDataFiles.get(0).schema();
-    SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkDataFileType);
+    SparkDataFile dataFileWrapper = new SparkDataFile(dataFileType, 
sparkDataFileType);
 
     for (int i = 0; i < dataFiles.size(); i++) {
-      checkDataFile(dataFiles.get(i), wrapper.wrap(sparkDataFiles.get(i)));
+      checkDataFile(dataFiles.get(i), 
dataFileWrapper.wrap(sparkDataFiles.get(i)));
+    }
+
+    Dataset<Row> positionDeleteFileDF =
+        spark.read().format("iceberg").load(tableLocation + 
"#delete_files").where("content = 1");
+    List<Row> sparkPositionDeleteFiles = 
shuffleColumns(positionDeleteFileDF).collectAsList();
+    assertThat(sparkPositionDeleteFiles).hasSameSizeAs(positionDeleteFiles);
+
+    Types.StructType positionDeleteFileType = 
DataFile.getType(dataFilesSpec.partitionType());
+    StructType sparkPositionDeleteFileType = 
sparkPositionDeleteFiles.get(0).schema();
+    SparkDeleteFile positionDeleteFileWrapper =
+        new SparkDeleteFile(positionDeleteFileType, 
sparkPositionDeleteFileType);
+
+    for (int i = 0; i < positionDeleteFiles.size(); i++) {
+      checkDeleteFile(
+          positionDeleteFiles.get(i),
+          positionDeleteFileWrapper.wrap(sparkPositionDeleteFiles.get(i)));
+    }
+
+    Dataset<Row> equalityDeleteFileDF =
+        spark.read().format("iceberg").load(tableLocation + 
"#delete_files").where("content = 2");
+    List<Row> sparkEqualityDeleteFiles = 
shuffleColumns(equalityDeleteFileDF).collectAsList();
+    assertThat(sparkEqualityDeleteFiles).hasSameSizeAs(equalityDeleteFiles);
+
+    Types.StructType equalityDeleteFileType = 
DataFile.getType(table.spec().partitionType());
+    StructType sparkEqualityDeleteFileType = 
sparkEqualityDeleteFiles.get(0).schema();
+    SparkDeleteFile equalityDeleteFileWrapper =
+        new SparkDeleteFile(equalityDeleteFileType, 
sparkEqualityDeleteFileType);
+
+    for (int i = 0; i < equalityDeleteFiles.size(); i++) {
+      checkDeleteFile(
+          equalityDeleteFiles.get(i),
+          equalityDeleteFileWrapper.wrap(sparkEqualityDeleteFiles.get(i)));
     }
   }
 
+  private Dataset<Row> shuffleColumns(Dataset<Row> df) {
+    List<Column> columns =
+        
Arrays.stream(df.columns()).map(ColumnName::new).collect(Collectors.toList());
+    Collections.shuffle(columns);
+    return df.select(columns.toArray(new Column[0]));
+  }
+
   private void checkDataFile(DataFile expected, DataFile actual) {
-    Assert.assertEquals("Path must match", expected.path(), actual.path());
-    Assert.assertEquals("Format must match", expected.format(), 
actual.format());
-    Assert.assertEquals("Record count must match", expected.recordCount(), 
actual.recordCount());
-    Assert.assertEquals("Size must match", expected.fileSizeInBytes(), 
actual.fileSizeInBytes());
-    Assert.assertEquals(
-        "Record value counts must match", expected.valueCounts(), 
actual.valueCounts());
-    Assert.assertEquals(
-        "Record null value counts must match",
-        expected.nullValueCounts(),
-        actual.nullValueCounts());
-    Assert.assertEquals(
-        "Record nan value counts must match", expected.nanValueCounts(), 
actual.nanValueCounts());
-    Assert.assertEquals("Lower bounds must match", expected.lowerBounds(), 
actual.lowerBounds());
-    Assert.assertEquals("Upper bounds must match", expected.upperBounds(), 
actual.upperBounds());
-    Assert.assertEquals("Key metadata must match", expected.keyMetadata(), 
actual.keyMetadata());
-    Assert.assertEquals("Split offsets must match", expected.splitOffsets(), 
actual.splitOffsets());
-    Assert.assertEquals("Sort order id must match", expected.sortOrderId(), 
actual.sortOrderId());
+    assertThat(expected.equalityFieldIds()).isNull();
+    assertThat(actual.equalityFieldIds()).isNull();
+    checkContentFile(expected, actual);
+    checkStructLike(expected.partition(), actual.partition());
+  }
 
+  private void checkDeleteFile(DeleteFile expected, DeleteFile actual) {
+    
assertThat(expected.equalityFieldIds()).isEqualTo(actual.equalityFieldIds());
+    checkContentFile(expected, actual);
     checkStructLike(expected.partition(), actual.partition());
   }
 
+  private void checkContentFile(ContentFile<?> expected, ContentFile<?> 
actual) {
+    assertThat(actual.content()).isEqualTo(expected.content());
+    assertThat(actual.path()).isEqualTo(expected.path());
+    assertThat(actual.format()).isEqualTo(expected.format());
+    assertThat(actual.recordCount()).isEqualTo(expected.recordCount());
+    assertThat(actual.fileSizeInBytes()).isEqualTo(expected.fileSizeInBytes());
+    assertThat(actual.valueCounts()).isEqualTo(expected.valueCounts());
+    assertThat(actual.nullValueCounts()).isEqualTo(expected.nullValueCounts());
+    assertThat(actual.nanValueCounts()).isEqualTo(expected.nanValueCounts());
+    assertThat(actual.lowerBounds()).isEqualTo(expected.lowerBounds());
+    assertThat(actual.upperBounds()).isEqualTo(expected.upperBounds());
+    assertThat(actual.keyMetadata()).isEqualTo(expected.keyMetadata());
+    assertThat(actual.splitOffsets()).isEqualTo(expected.splitOffsets());
+    assertThat(actual.sortOrderId()).isEqualTo(expected.sortOrderId());
+  }
+
   private void checkStructLike(StructLike expected, StructLike actual) {
-    Assert.assertEquals("Struct size should match", expected.size(), 
actual.size());
+    assertThat(actual.size()).isEqualTo(expected.size());
     for (int i = 0; i < expected.size(); i++) {
-      Assert.assertEquals(
-          "Struct values must match", expected.get(i, Object.class), 
actual.get(i, Object.class));
+      assertThat(actual.get(i, Object.class)).isEqualTo(expected.get(i, 
Object.class));
     }
   }
+
+  private DeleteFile createPositionDeleteFile(Table table, DataFile dataFile) {
+    PartitionSpec spec = table.specs().get(dataFile.specId());
+    return FileMetadata.deleteFileBuilder(spec)
+        .ofPositionDeletes()
+        .withPath("/path/to/pos-deletes-" + UUID.randomUUID() + ".parquet")
+        .withFileSizeInBytes(dataFile.fileSizeInBytes() / 4)
+        .withPartition(dataFile.partition())
+        .withRecordCount(2)
+        .withMetrics(
+            new Metrics(
+                2L,
+                null, // no column sizes
+                null, // no value counts
+                null, // no null counts
+                null, // no NaN counts
+                ImmutableMap.of(
+                    MetadataColumns.DELETE_FILE_PATH.fieldId(),
+                    Conversions.toByteBuffer(Types.StringType.get(), 
dataFile.path())),
+                ImmutableMap.of(
+                    MetadataColumns.DELETE_FILE_PATH.fieldId(),
+                    Conversions.toByteBuffer(Types.StringType.get(), 
dataFile.path()))))
+        .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(35))
+        .build();
+  }
+
+  private DeleteFile createEqualityDeleteFile(Table table) {
+    return FileMetadata.deleteFileBuilder(table.spec())
+        .ofEqualityDeletes(3, 4)
+        .withPath("/path/to/eq-deletes-" + UUID.randomUUID() + ".parquet")
+        .withFileSizeInBytes(250)
+        .withRecordCount(1)
+        .withSortOrder(SortOrder.unsorted())
+        .withEncryptionKeyMetadata(ByteBuffer.allocate(4).putInt(35))
+        .build();
+  }
 }

Reply via email to