This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new a8ec43d975 Core, Spark 3.5: Remove dangling deletes as part of 
RewriteDataFilesAction (#9724)
a8ec43d975 is described below

commit a8ec43d975d8d3bbacb6880b487208b00cb7361f
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Tue Oct 22 15:28:57 2024 -0700

    Core, Spark 3.5: Remove dangling deletes as part of RewriteDataFilesAction 
(#9724)
---
 .../apache/iceberg/actions/ActionsProvider.java    |   6 +
 .../iceberg/actions/RemoveDanglingDeleteFiles.java |  35 ++
 .../apache/iceberg/actions/RewriteDataFiles.java   |  16 +
 .../actions/BaseRemoveDanglingDeleteFiles.java     |  33 ++
 .../iceberg/actions/BaseRewriteDataFiles.java      |   6 +
 .../org/apache/iceberg/spark/SparkContentFile.java |   7 +-
 .../actions/RemoveDanglingDeletesSparkAction.java  | 179 +++++++++
 .../spark/actions/RewriteDataFilesSparkAction.java |  34 +-
 .../apache/iceberg/spark/actions/SparkActions.java |   6 +
 .../actions/TestRemoveDanglingDeleteAction.java    | 447 +++++++++++++++++++++
 .../spark/actions/TestRewriteDataFilesAction.java  | 215 +++++++++-
 .../TestRewritePositionDeleteFilesAction.java      |   3 +-
 12 files changed, 974 insertions(+), 13 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java 
b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index bcc77b25d6..61750d83fc 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -82,4 +82,10 @@ public interface ActionsProvider {
     throw new UnsupportedOperationException(
         this.getClass().getName() + " does not implement rewriteTablePath");
   }
+
+  /** Instantiates an action to remove dangling delete files from current 
snapshot. */
+  default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " does not implement 
removeDanglingDeleteFiles");
+  }
 }
diff --git 
a/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
new file mode 100644
index 0000000000..b0ef0d5e35
--- /dev/null
+++ 
b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.DeleteFile;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A 
delete file is dangling
+ * if its deletes no longer applies to any live data files.
+ */
+public interface RemoveDanglingDeleteFiles
+    extends Action<RemoveDanglingDeleteFiles, 
RemoveDanglingDeleteFiles.Result> {
+
+  /** An action that remove dangling deletes. */
+  interface Result {
+    /** Return removed deletes. */
+    Iterable<DeleteFile> removedDeleteFiles();
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index f6ef402708..589b901774 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -106,6 +106,18 @@ public interface RewriteDataFiles
 
   boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
 
+  /**
+   * Remove dangling delete files from the current snapshot after compaction. 
A delete file is
+   * considered dangling if it does not apply to any live data files.
+   *
+   * <p>Both equality and position dangling delete files will be removed.
+   *
+   * <p>Defaults to false.
+   */
+  String REMOVE_DANGLING_DELETES = "remove-dangling-deletes";
+
+  boolean REMOVE_DANGLING_DELETES_DEFAULT = false;
+
   /**
    * Forces the rewrite job order based on the value.
    *
@@ -216,6 +228,10 @@ public interface RewriteDataFiles
     default int failedDataFilesCount() {
       return 
rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
     }
+
+    default int removedDeleteFilesCount() {
+      return 0;
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
 
b/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
new file mode 100644
index 0000000000..3b5ce9e79a
--- /dev/null
+++ 
b/core/src/main/java/org/apache/iceberg/actions/BaseRemoveDanglingDeleteFiles.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.immutables.value.Value;
+
[email protected]
+@SuppressWarnings("ImmutablesStyle")
[email protected](
+    typeImmutableEnclosing = "ImmutableRemoveDanglingDeleteFiles",
+    visibilityString = "PUBLIC",
+    builderVisibilityString = "PUBLIC")
+interface BaseRemoveDanglingDeleteFiles extends RemoveDanglingDeleteFiles {
+
+  @Value.Immutable
+  interface Result extends RemoveDanglingDeleteFiles.Result {}
+}
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java 
b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
index 953439484a..2faa1f1b75 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFiles.java
@@ -55,6 +55,12 @@ interface BaseRewriteDataFiles extends RewriteDataFiles {
       return RewriteDataFiles.Result.super.rewrittenBytesCount();
     }
 
+    @Override
+    @Value.Default
+    default int removedDeleteFilesCount() {
+      return RewriteDataFiles.Result.super.removedDeleteFilesCount();
+    }
+
     @Override
     @Value.Default
     default int failedDataFilesCount() {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index f756c4cde0..99586f2503 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -52,6 +52,7 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
   private final int keyMetadataPosition;
   private final int splitOffsetsPosition;
   private final int sortOrderIdPosition;
+  private final int fileSpecIdPosition;
   private final int equalityIdsPosition;
   private final Type lowerBoundsType;
   private final Type upperBoundsType;
@@ -100,6 +101,7 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
     this.keyMetadataPosition = positions.get(DataFile.KEY_METADATA.name());
     this.splitOffsetsPosition = positions.get(DataFile.SPLIT_OFFSETS.name());
     this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
+    this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
     this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
   }
 
@@ -120,7 +122,10 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
 
   @Override
   public int specId() {
-    return -1;
+    if (wrapped.isNullAt(fileSpecIdPosition)) {
+      return -1;
+    }
+    return wrapped.getAs(fileSpecIdPosition);
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
new file mode 100644
index 0000000000..bbf65f58e1
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RemoveDanglingDeletesSparkAction.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.min;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ImmutableRemoveDanglingDeleteFiles;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.SparkDeleteFile;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A 
delete file is dangling
+ * if its deletes no longer applies to any live data files.
+ *
+ * <p>The following dangling delete files are removed:
+ *
+ * <ul>
+ *   <li>Position delete files with a data sequence number less than that of 
any data file in the
+ *       same partition
+ *   <li>Equality delete files with a data sequence number less than or equal 
to that of any data
+ *       file in the same partition
+ * </ul>
+ */
+class RemoveDanglingDeletesSparkAction
+    extends BaseSnapshotUpdateSparkAction<RemoveDanglingDeletesSparkAction>
+    implements RemoveDanglingDeleteFiles {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class);
+  private final Table table;
+
+  protected RemoveDanglingDeletesSparkAction(SparkSession spark, Table table) {
+    super(spark);
+    this.table = table;
+  }
+
+  @Override
+  protected RemoveDanglingDeletesSparkAction self() {
+    return this;
+  }
+
+  public Result execute() {
+    if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
+      // ManifestFilterManager already performs this table-wide delete on each 
commit
+      return ImmutableRemoveDanglingDeleteFiles.Result.builder()
+          .removedDeleteFiles(Collections.emptyList())
+          .build();
+    }
+
+    String desc = String.format("Removing dangling delete files in %s", 
table.name());
+    JobGroupInfo info = newJobGroupInfo("REMOVE-DELETES", desc);
+    return withJobGroupInfo(info, this::doExecute);
+  }
+
+  Result doExecute() {
+    RewriteFiles rewriteFiles = table.newRewrite();
+    List<DeleteFile> danglingDeletes = findDanglingDeletes();
+    for (DeleteFile deleteFile : danglingDeletes) {
+      LOG.debug("Removing dangling delete file {}", deleteFile.path());
+      rewriteFiles.deleteFile(deleteFile);
+    }
+
+    if (!danglingDeletes.isEmpty()) {
+      commit(rewriteFiles);
+    }
+
+    return ImmutableRemoveDanglingDeleteFiles.Result.builder()
+        .removedDeleteFiles(danglingDeletes)
+        .build();
+  }
+
+  /**
+   * Dangling delete files can be identified with following steps
+   *
+   * <ol>
+   *   <li>Group data files by partition keys and find the minimum data 
sequence number in each
+   *       group.
+   *   <li>Left outer join delete files with partition-grouped data files on 
partition keys.
+   *   <li>Find dangling deletes by comparing each delete file's sequence 
number to its partition's
+   *       minimum data sequence number.
+   *   <li>Collect results row to driver and use {@link SparkDeleteFile 
SparkDeleteFile} to wrap
+   *       rows to valid delete files
+   * </ol>
+   */
+  private List<DeleteFile> findDanglingDeletes() {
+    Dataset<Row> minSequenceNumberByPartition =
+        loadMetadataTable(table, MetadataTableType.ENTRIES)
+            // find live data files
+            .filter("data_file.content == 0 AND status < 2")
+            .selectExpr(
+                "data_file.partition as partition",
+                "data_file.spec_id as spec_id",
+                "sequence_number")
+            .groupBy("partition", "spec_id")
+            .agg(min("sequence_number"))
+            .toDF("grouped_partition", "grouped_spec_id", 
"min_data_sequence_number");
+
+    Dataset<Row> deleteEntries =
+        loadMetadataTable(table, MetadataTableType.ENTRIES)
+            // find live delete files
+            .filter("data_file.content != 0 AND status < 2");
+
+    Column joinOnPartition =
+        deleteEntries
+            .col("data_file.spec_id")
+            .equalTo(minSequenceNumberByPartition.col("grouped_spec_id"))
+            .and(
+                deleteEntries
+                    .col("data_file.partition")
+                    
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));
+
+    Column filterOnDanglingDeletes =
+        col("min_data_sequence_number")
+            // delete fies without any data files in partition
+            .isNull()
+            // position delete files without any applicable data files in 
partition
+            .or(
+                col("data_file.content")
+                    .equalTo("1")
+                    
.and(col("sequence_number").$less(col("min_data_sequence_number"))))
+            // equality delete files without any applicable data files in the 
partition
+            .or(
+                col("data_file.content")
+                    .equalTo("2")
+                    
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));
+
+    Dataset<Row> danglingDeletes =
+        deleteEntries
+            .join(minSequenceNumberByPartition, joinOnPartition, "left")
+            .filter(filterOnDanglingDeletes)
+            .select("data_file.*");
+    return danglingDeletes.collectAsList().stream()
+        // map on driver because SparkDeleteFile is not serializable
+        .map(row -> deleteFileWrapper(danglingDeletes.schema(), row))
+        .collect(Collectors.toList());
+  }
+
+  private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) {
+    int specId = row.getInt(row.fieldIndex("spec_id"));
+    Types.StructType combinedFileType = 
DataFile.getType(Partitioning.partitionType(table));
+    // Set correct spec id
+    Types.StructType projection = 
DataFile.getType(table.specs().get(specId).partitionType());
+    return new SparkDeleteFile(combinedFileType, projection, 
sparkFileType).wrap(row);
+  }
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index d33e5e5408..4e381a7bd3 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -40,6 +40,7 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.FileRewriter;
 import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
+import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder;
 import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
 import org.apache.iceberg.actions.RewriteFileGroup;
@@ -53,6 +54,7 @@ import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Queues;
@@ -83,7 +85,8 @@ public class RewriteDataFilesSparkAction
           TARGET_FILE_SIZE_BYTES,
           USE_STARTING_SEQUENCE_NUMBER,
           REWRITE_JOB_ORDER,
-          OUTPUT_SPEC_ID);
+          OUTPUT_SPEC_ID,
+          REMOVE_DANGLING_DELETES);
 
   private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
       
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
@@ -95,6 +98,7 @@ public class RewriteDataFilesSparkAction
   private int maxCommits;
   private int maxFailedCommits;
   private boolean partialProgressEnabled;
+  private boolean removeDanglingDeletes;
   private boolean useStartingSequenceNumber;
   private RewriteJobOrder rewriteJobOrder;
   private FileRewriter<FileScanTask, DataFile> rewriter = null;
@@ -175,11 +179,18 @@ public class RewriteDataFilesSparkAction
 
     Stream<RewriteFileGroup> groupStream = toGroupStream(ctx, 
fileGroupsByPartition);
 
-    if (partialProgressEnabled) {
-      return doExecuteWithPartialProgress(ctx, groupStream, 
commitManager(startingSnapshotId));
-    } else {
-      return doExecute(ctx, groupStream, commitManager(startingSnapshotId));
+    Builder resultBuilder =
+        partialProgressEnabled
+            ? doExecuteWithPartialProgress(ctx, groupStream, 
commitManager(startingSnapshotId))
+            : doExecute(ctx, groupStream, commitManager(startingSnapshotId));
+
+    if (removeDanglingDeletes) {
+      RemoveDanglingDeletesSparkAction action =
+          new RemoveDanglingDeletesSparkAction(spark(), table);
+      int removedCount = Iterables.size(action.execute().removedDeleteFiles());
+      resultBuilder.removedDeleteFilesCount(removedCount);
     }
+    return resultBuilder.build();
   }
 
   StructLikeMap<List<List<FileScanTask>>> planFileGroups(long 
startingSnapshotId) {
@@ -264,7 +275,7 @@ public class RewriteDataFilesSparkAction
         table, startingSnapshotId, useStartingSequenceNumber, commitSummary());
   }
 
-  private Result doExecute(
+  private Builder doExecute(
       RewriteExecutionContext ctx,
       Stream<RewriteFileGroup> groupStream,
       RewriteDataFilesCommitManager commitManager) {
@@ -326,10 +337,10 @@ public class RewriteDataFilesSparkAction
 
     List<FileGroupRewriteResult> rewriteResults =
         
rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
-    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
   }
 
-  private Result doExecuteWithPartialProgress(
+  private Builder doExecuteWithPartialProgress(
       RewriteExecutionContext ctx,
       Stream<RewriteFileGroup> groupStream,
       RewriteDataFilesCommitManager commitManager) {
@@ -386,8 +397,7 @@ public class RewriteDataFilesSparkAction
 
     return ImmutableRewriteDataFiles.Result.builder()
         .rewriteResults(toRewriteResults(commitService.results()))
-        .rewriteFailures(rewriteFailures)
-        .build();
+        .rewriteFailures(rewriteFailures);
   }
 
   Stream<RewriteFileGroup> toGroupStream(
@@ -456,6 +466,10 @@ public class RewriteDataFilesSparkAction
         PropertyUtil.propertyAsBoolean(
             options(), USE_STARTING_SEQUENCE_NUMBER, 
USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
 
+    removeDanglingDeletes =
+        PropertyUtil.propertyAsBoolean(
+            options(), REMOVE_DANGLING_DELETES, 
REMOVE_DANGLING_DELETES_DEFAULT);
+
     rewriteJobOrder =
         RewriteJobOrder.fromName(
             PropertyUtil.propertyAsString(options(), REWRITE_JOB_ORDER, 
REWRITE_JOB_ORDER_DEFAULT));
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
index f845386d30..ba9fa2e7b4 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkActions.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.actions;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.actions.ComputeTableStats;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.spark.sql.SparkSession;
@@ -102,4 +103,9 @@ public class SparkActions implements ActionsProvider {
   public ComputeTableStats computeTableStats(Table table) {
     return new ComputeTableStatsSparkAction(spark, table);
   }
+
+  @Override
+  public RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+    return new RemoveDanglingDeletesSparkAction(spark, table);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
new file mode 100644
index 0000000000..e15b2fb217
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveDanglingDeleteAction.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RemoveDanglingDeleteFiles;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Encoders;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import scala.Tuple2;
+
+public class TestRemoveDanglingDeleteAction extends TestBase {
+
+  private static final HadoopTables TABLES = new HadoopTables(new 
Configuration());
+  private static final Schema SCHEMA =
+      new Schema(
+          optional(1, "c1", Types.StringType.get()),
+          optional(2, "c2", Types.StringType.get()),
+          optional(3, "c3", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
+  static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_A2 =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_B2 =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=c") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_C2 =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=c") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=d") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DataFile FILE_D2 =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=d") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_A_POS_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-a-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_A2_POS_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-a2-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_A_EQ_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-a-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_A2_EQ_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-a2-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=a") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_B_POS_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-b-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_B2_POS_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofPositionDeletes()
+          .withPath("/path/to/data-b2-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_B_EQ_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-b-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_B2_EQ_DELETES =
+      FileMetadata.deleteFileBuilder(SPEC)
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-b2-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("c1=b") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  static final DataFile FILE_UNPARTITIONED =
+      DataFiles.builder(PartitionSpec.unpartitioned())
+          .withPath("/path/to/data-unpartitioned.parquet")
+          .withFileSizeInBytes(10)
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_UNPARTITIONED_POS_DELETE =
+      FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-unpartitioned-pos-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withRecordCount(1)
+          .build();
+  static final DeleteFile FILE_UNPARTITIONED_EQ_DELETE =
+      FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+          .ofEqualityDeletes()
+          .withPath("/path/to/data-unpartitioned-eq-deletes.parquet")
+          .withFileSizeInBytes(10)
+          .withRecordCount(1)
+          .build();
+
+  @TempDir private Path temp;
+
+  private String tableLocation = null;
+  private Table table;
+
+  @BeforeEach
+  public void before() throws Exception {
+    File tableDir = temp.resolve("junit").toFile();
+    this.tableLocation = tableDir.toURI().toString();
+  }
+
+  @AfterEach
+  public void after() {
+    TABLES.dropTable(tableLocation);
+  }
+
+  private void setupPartitionedTable() {
+    this.table =
+        TABLES.create(
+            SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, 
"2"), tableLocation);
+  }
+
+  private void setupUnpartitionedTable() {
+    this.table =
+        TABLES.create(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"),
+            tableLocation);
+  }
+
+  @Test
+  public void testPartitionedDeletesWithLesserSeqNo() {
+    setupPartitionedTable();
+
+    // Add Data Files
+    
table.newAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // Add Delete Files
+    table
+        .newRowDelta()
+        .addDeletes(FILE_A_POS_DELETES)
+        .addDeletes(FILE_A2_POS_DELETES)
+        .addDeletes(FILE_B_POS_DELETES)
+        .addDeletes(FILE_B2_POS_DELETES)
+        .addDeletes(FILE_A_EQ_DELETES)
+        .addDeletes(FILE_A2_EQ_DELETES)
+        .addDeletes(FILE_B_EQ_DELETES)
+        .addDeletes(FILE_B2_EQ_DELETES)
+        .commit();
+
+    // Add More Data Files
+    table
+        .newAppend()
+        .appendFile(FILE_A2)
+        .appendFile(FILE_B2)
+        .appendFile(FILE_C2)
+        .appendFile(FILE_D2)
+        .commit();
+
+    List<Tuple2<Long, String>> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#entries")
+            .select("sequence_number", "data_file.file_path")
+            .sort("sequence_number", "data_file.file_path")
+            .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+            .collectAsList();
+    List<Tuple2<Long, String>> expected =
+        ImmutableList.of(
+            Tuple2.apply(1L, FILE_B.path().toString()),
+            Tuple2.apply(1L, FILE_C.path().toString()),
+            Tuple2.apply(1L, FILE_D.path().toString()),
+            Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+            Tuple2.apply(3L, FILE_A2.path().toString()),
+            Tuple2.apply(3L, FILE_B2.path().toString()),
+            Tuple2.apply(3L, FILE_C2.path().toString()),
+            Tuple2.apply(3L, FILE_D2.path().toString()));
+    assertThat(actual).isEqualTo(expected);
+
+    RemoveDanglingDeleteFiles.Result result =
+        SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+    // All Delete files of the FILE A partition should be removed
+    // because there are no data files in partition with a lesser sequence 
number
+
+    Set<CharSequence> removedDeleteFiles =
+        StreamSupport.stream(result.removedDeleteFiles().spliterator(), false)
+            .map(DeleteFile::path)
+            .collect(Collectors.toSet());
+    assertThat(removedDeleteFiles)
+        .as("Expected 4 delete files removed")
+        .hasSize(4)
+        .containsExactlyInAnyOrder(
+            FILE_A_POS_DELETES.path(),
+            FILE_A2_POS_DELETES.path(),
+            FILE_A_EQ_DELETES.path(),
+            FILE_A2_EQ_DELETES.path());
+
+    List<Tuple2<Long, String>> actualAfter =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#entries")
+            .filter("status < 2") // live files
+            .select("sequence_number", "data_file.file_path")
+            .sort("sequence_number", "data_file.file_path")
+            .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+            .collectAsList();
+    List<Tuple2<Long, String>> expectedAfter =
+        ImmutableList.of(
+            Tuple2.apply(1L, FILE_B.path().toString()),
+            Tuple2.apply(1L, FILE_C.path().toString()),
+            Tuple2.apply(1L, FILE_D.path().toString()),
+            Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+            Tuple2.apply(3L, FILE_A2.path().toString()),
+            Tuple2.apply(3L, FILE_B2.path().toString()),
+            Tuple2.apply(3L, FILE_C2.path().toString()),
+            Tuple2.apply(3L, FILE_D2.path().toString()));
+    assertThat(actualAfter).isEqualTo(expectedAfter);
+  }
+
+  @Test
+  public void testPartitionedDeletesWithEqSeqNo() {
+    setupPartitionedTable();
+
+    // Add Data Files
+    
table.newAppend().appendFile(FILE_A).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // Add Data Files with EQ and POS deletes
+    table
+        .newRowDelta()
+        .addRows(FILE_A2)
+        .addRows(FILE_B2)
+        .addRows(FILE_C2)
+        .addRows(FILE_D2)
+        .addDeletes(FILE_A_POS_DELETES)
+        .addDeletes(FILE_A2_POS_DELETES)
+        .addDeletes(FILE_A_EQ_DELETES)
+        .addDeletes(FILE_A2_EQ_DELETES)
+        .addDeletes(FILE_B_POS_DELETES)
+        .addDeletes(FILE_B2_POS_DELETES)
+        .addDeletes(FILE_B_EQ_DELETES)
+        .addDeletes(FILE_B2_EQ_DELETES)
+        .commit();
+
+    List<Tuple2<Long, String>> actual =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#entries")
+            .select("sequence_number", "data_file.file_path")
+            .sort("sequence_number", "data_file.file_path")
+            .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+            .collectAsList();
+    List<Tuple2<Long, String>> expected =
+        ImmutableList.of(
+            Tuple2.apply(1L, FILE_A.path().toString()),
+            Tuple2.apply(1L, FILE_C.path().toString()),
+            Tuple2.apply(1L, FILE_D.path().toString()),
+            Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2.path().toString()),
+            Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2.path().toString()),
+            Tuple2.apply(2L, FILE_B2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_C2.path().toString()),
+            Tuple2.apply(2L, FILE_D2.path().toString()));
+    assertThat(actual).isEqualTo(expected);
+
+    RemoveDanglingDeleteFiles.Result result =
+        SparkActions.get().removeDanglingDeleteFiles(table).execute();
+
+    // Eq Delete files of the FILE B partition should be removed
+    // because there are no data files in partition with a lesser sequence 
number
+    Set<CharSequence> removedDeleteFiles =
+        StreamSupport.stream(result.removedDeleteFiles().spliterator(), false)
+            .map(DeleteFile::path)
+            .collect(Collectors.toSet());
+    assertThat(removedDeleteFiles)
+        .as("Expected two delete files removed")
+        .hasSize(2)
+        .containsExactlyInAnyOrder(FILE_B_EQ_DELETES.path(), 
FILE_B2_EQ_DELETES.path());
+
+    List<Tuple2<Long, String>> actualAfter =
+        spark
+            .read()
+            .format("iceberg")
+            .load(tableLocation + "#entries")
+            .filter("status < 2") // live files
+            .select("sequence_number", "data_file.file_path")
+            .sort("sequence_number", "data_file.file_path")
+            .as(Encoders.tuple(Encoders.LONG(), Encoders.STRING()))
+            .collectAsList();
+    List<Tuple2<Long, String>> expectedAfter =
+        ImmutableList.of(
+            Tuple2.apply(1L, FILE_A.path().toString()),
+            Tuple2.apply(1L, FILE_C.path().toString()),
+            Tuple2.apply(1L, FILE_D.path().toString()),
+            Tuple2.apply(2L, FILE_A_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2.path().toString()),
+            Tuple2.apply(2L, FILE_A2_EQ_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_A2_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_B2.path().toString()),
+            Tuple2.apply(2L, FILE_B2_POS_DELETES.path().toString()),
+            Tuple2.apply(2L, FILE_C2.path().toString()),
+            Tuple2.apply(2L, FILE_D2.path().toString()));
+    assertThat(actualAfter).isEqualTo(expectedAfter);
+  }
+
+  @Test
+  public void testUnpartitionedTable() {
+    setupUnpartitionedTable();
+
+    table
+        .newRowDelta()
+        .addDeletes(FILE_UNPARTITIONED_POS_DELETE)
+        .addDeletes(FILE_UNPARTITIONED_EQ_DELETE)
+        .commit();
+    table.newAppend().appendFile(FILE_UNPARTITIONED).commit();
+
+    RemoveDanglingDeleteFiles.Result result =
+        SparkActions.get().removeDanglingDeleteFiles(table).execute();
+    assertThat(result.removedDeleteFiles()).as("No-op for unpartitioned 
tables").isEmpty();
+  }
+}
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index b67ee87c7d..2de83f8b35 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -24,6 +24,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 import static org.apache.spark.sql.functions.current_date;
 import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
+import static org.apache.spark.sql.functions.min;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -56,6 +57,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.RowDelta;
@@ -73,7 +75,9 @@ import org.apache.iceberg.actions.RewriteFileGroup;
 import org.apache.iceberg.actions.SizeBasedDataRewriter;
 import org.apache.iceberg.actions.SizeBasedFileRewriter;
 import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedFiles;
@@ -86,6 +90,7 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -105,9 +110,11 @@ import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.jupiter.api.BeforeAll;
@@ -128,6 +135,8 @@ public class TestRewriteDataFilesAction extends TestBase {
           optional(2, "c2", Types.StringType.get()),
           optional(3, "c3", Types.StringType.get()));
 
+  private static final PartitionSpec SPEC = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+
   @TempDir private Path temp;
 
   private final FileRewriteCoordinator coordinator = 
FileRewriteCoordinator.get();
@@ -336,6 +345,125 @@ public class TestRewriteDataFilesAction extends TestBase {
     assertThat(actualRecords).as("7 rows are removed").hasSize(total - 7);
   }
 
+  @Test
+  public void testRemoveDangledEqualityDeletesPartitionEvolution() {
+    Table table =
+        TABLES.create(
+            SCHEMA,
+            SPEC,
+            Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"),
+            tableLocation);
+
+    // data seq = 1, write 4 files in 2 partitions
+    List<ThreeColumnRecord> records1 =
+        Lists.newArrayList(
+            new ThreeColumnRecord(1, null, "AAAA"), new ThreeColumnRecord(1, 
"BBBBBBBBBB", "BBBB"));
+    writeRecords(records1);
+    List<ThreeColumnRecord> records2 =
+        Lists.newArrayList(
+            new ThreeColumnRecord(0, "CCCCCCCCCC", "CCCC"),
+            new ThreeColumnRecord(0, "DDDDDDDDDD", "DDDD"));
+    writeRecords(records2);
+    table.refresh();
+    shouldHaveFiles(table, 4);
+
+    // data seq = 2 & 3, write 2 equality deletes in both partitions
+    writeEqDeleteRecord(table, "c1", 1, "c3", "AAAA");
+    writeEqDeleteRecord(table, "c1", 2, "c3", "CCCC");
+    table.refresh();
+    Set<DeleteFile> existingDeletes = TestHelpers.deleteFiles(table);
+    assertThat(existingDeletes)
+        .as("Only one equality delete c1=1 is used in query planning")
+        .hasSize(1);
+
+    // partition evolution
+    table.refresh();
+    table.updateSpec().addField(Expressions.ref("c3")).commit();
+
+    // data seq = 4, write 2 new data files in both partitions for evolved spec
+    List<ThreeColumnRecord> records3 =
+        Lists.newArrayList(
+            new ThreeColumnRecord(1, "A", "CCCC"), new ThreeColumnRecord(2, 
"D", "DDDD"));
+    writeRecords(records3);
+
+    List<Object[]> originalData = currentData();
+
+    RewriteDataFiles.Result result =
+        basicRewrite(table)
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .filter(Expressions.equal("c1", 1))
+            .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true")
+            .execute();
+
+    existingDeletes = TestHelpers.deleteFiles(table);
+    assertThat(existingDeletes).as("Shall pruned dangling deletes after 
rewrite").hasSize(0);
+
+    assertThat(result)
+        .extracting(
+            Result::addedDataFilesCount,
+            Result::rewrittenDataFilesCount,
+            Result::removedDeleteFilesCount)
+        .as("Should compact 3 data files into 2 and remove both dangled 
equality delete file")
+        .containsExactly(2, 3, 2);
+    shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 
1", 5);
+
+    List<Object[]> postRewriteData = currentData();
+    assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
+
+    shouldHaveSnapshots(table, 7);
+    shouldHaveFiles(table, 5);
+  }
+
+  @Test
+  public void testRemoveDangledPositionDeletesPartitionEvolution() {
+    Table table =
+        TABLES.create(
+            SCHEMA,
+            SPEC,
+            Collections.singletonMap(TableProperties.FORMAT_VERSION, "2"),
+            tableLocation);
+
+    // data seq = 1, write 4 files in 2 partitions
+    writeRecords(2, 2, 2);
+    List<DataFile> dataFilesBefore = TestHelpers.dataFiles(table, null);
+    shouldHaveFiles(table, 4);
+
+    // data seq = 2, write 1 position deletes in c1=1
+    table
+        .newRowDelta()
+        .addDeletes(writePosDeletesToFile(table, dataFilesBefore.get(3), 
1).get(0))
+        .commit();
+
+    // partition evolution
+    table.updateSpec().addField(Expressions.ref("c3")).commit();
+
+    // data seq = 3, write 1 new data files in c1=1 for evolved spec
+    writeRecords(1, 1, 1);
+    shouldHaveFiles(table, 5);
+    List<Object[]> expectedRecords = currentData();
+
+    Result result =
+        actions()
+            .rewriteDataFiles(table)
+            .filter(Expressions.equal("c1", 1))
+            .option(SizeBasedFileRewriter.REWRITE_ALL, "true")
+            .option(RewriteDataFiles.REMOVE_DANGLING_DELETES, "true")
+            .execute();
+
+    assertThat(result)
+        .extracting(
+            Result::addedDataFilesCount,
+            Result::rewrittenDataFilesCount,
+            Result::removedDeleteFilesCount)
+        .as("Should rewrite 2 data files into 1 and remove 1 dangled position 
delete file")
+        .containsExactly(1, 2, 1);
+    shouldHaveMinSequenceNumberInPartition(table, "data_file.partition.c1 == 
1", 3);
+
+    shouldHaveSnapshots(table, 5);
+    
assertThat(table.currentSnapshot().summary().get("total-position-deletes")).isEqualTo("0");
+    assertEquals("Rows must match", expectedRecords, currentData());
+  }
+
   @Test
   public void testBinPackWithDeleteAllData() {
     Map<String, String> options = Maps.newHashMap();
@@ -1697,6 +1825,21 @@ public class TestRewriteDataFilesAction extends TestBase 
{
     assertThat(numFiles).as("Did not have the expected number of 
files").isEqualTo(numExpected);
   }
 
+  protected long shouldHaveMinSequenceNumberInPartition(
+      Table table, String partitionFilter, long expected) {
+    long actual =
+        SparkTableUtil.loadMetadataTable(spark, table, 
MetadataTableType.ENTRIES)
+            .filter("status != 2")
+            .filter(partitionFilter)
+            .select("sequence_number")
+            .agg(min("sequence_number"))
+            .as(Encoders.LONG())
+            .collectAsList()
+            .get(0);
+    assertThat(actual).as("Did not have the expected min sequence 
number").isEqualTo(expected);
+    return actual;
+  }
+
   protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
     table.refresh();
     int actualSnapshots = Iterables.size(table.snapshots());
@@ -1893,6 +2036,11 @@ public class TestRewriteDataFilesAction extends TestBase 
{
             .getAsDouble();
   }
 
+  private void writeRecords(List<ThreeColumnRecord> records) {
+    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
+    writeDF(df);
+  }
+
   private void writeRecords(int files, int numRecords) {
     writeRecords(files, numRecords, 0);
   }
@@ -1946,7 +2094,10 @@ public class TestRewriteDataFilesAction extends TestBase 
{
           table
               .io()
               .newOutputFile(
-                  
table.locationProvider().newDataLocation(UUID.randomUUID().toString()));
+                  table
+                      .locationProvider()
+                      .newDataLocation(
+                          
FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())));
       EncryptedOutputFile encryptedOutputFile =
           EncryptedFiles.encryptedOutput(outputFile, 
EncryptionKeyMetadata.EMPTY);
 
@@ -1972,6 +2123,68 @@ public class TestRewriteDataFilesAction extends TestBase 
{
     return results;
   }
 
+  private void writeEqDeleteRecord(
+      Table table, String partCol, Object partVal, String delCol, Object 
delVal) {
+    List<Integer> equalityFieldIds = 
Lists.newArrayList(table.schema().findField(delCol).fieldId());
+    Schema eqDeleteRowSchema = table.schema().select(delCol);
+    Record partitionRecord =
+        GenericRecord.create(table.schema().select(partCol))
+            .copy(ImmutableMap.of(partCol, partVal));
+    Record record = 
GenericRecord.create(eqDeleteRowSchema).copy(ImmutableMap.of(delCol, delVal));
+    writeEqDeleteRecord(table, equalityFieldIds, partitionRecord, 
eqDeleteRowSchema, record);
+  }
+
+  private void writeEqDeleteRecord(
+      Table table,
+      List<Integer> equalityFieldIds,
+      Record partitionRecord,
+      Schema eqDeleteRowSchema,
+      Record deleteRecord) {
+    OutputFileFactory fileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PARQUET).build();
+    GenericAppenderFactory appenderFactory =
+        new GenericAppenderFactory(
+            table.schema(),
+            table.spec(),
+            ArrayUtil.toIntArray(equalityFieldIds),
+            eqDeleteRowSchema,
+            null);
+
+    EncryptedOutputFile file =
+        createEncryptedOutputFile(createPartitionKey(table, partitionRecord), 
fileFactory);
+
+    EqualityDeleteWriter<Record> eqDeleteWriter =
+        appenderFactory.newEqDeleteWriter(
+            file, FileFormat.PARQUET, createPartitionKey(table, 
partitionRecord));
+
+    try (EqualityDeleteWriter<Record> clsEqDeleteWriter = eqDeleteWriter) {
+      clsEqDeleteWriter.write(deleteRecord);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    table.newRowDelta().addDeletes(eqDeleteWriter.toDeleteFile()).commit();
+  }
+
+  private PartitionKey createPartitionKey(Table table, Record record) {
+    if (table.spec().isUnpartitioned()) {
+      return null;
+    }
+
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    partitionKey.partition(record);
+
+    return partitionKey;
+  }
+
+  private EncryptedOutputFile createEncryptedOutputFile(
+      PartitionKey partition, OutputFileFactory fileFactory) {
+    if (partition == null) {
+      return fileFactory.newOutputFile();
+    } else {
+      return fileFactory.newOutputFile(partition);
+    }
+  }
+
   private SparkActions actions() {
     return SparkActions.get();
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
index 37b6cd86fb..8547f9753f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java
@@ -862,6 +862,7 @@ public class TestRewritePositionDeleteFilesAction extends 
CatalogTestBase {
         files.stream().collect(Collectors.groupingBy(ContentFile::partition));
     List<DeleteFile> deleteFiles =
         Lists.newArrayListWithCapacity(deleteFilesPerPartition * 
filesByPartition.size());
+    String suffix = String.format(".%s", 
FileFormat.PARQUET.name().toLowerCase());
 
     for (Map.Entry<StructLike, List<DataFile>> filesByPartitionEntry :
         filesByPartition.entrySet()) {
@@ -886,7 +887,7 @@ public class TestRewritePositionDeleteFilesAction extends 
CatalogTestBase {
           if (counter == deleteFileSize) {
             // Dump to file and reset variables
             OutputFile output =
-                Files.localOutput(File.createTempFile("junit", null, 
temp.toFile()));
+                Files.localOutput(File.createTempFile("junit", suffix, 
temp.toFile()));
             deleteFiles.add(FileHelpers.writeDeleteFile(table, output, 
partition, deletes).first());
             counter = 0;
             deletes.clear();

Reply via email to