This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 312592d578 Spark: Optimize snapshot expiry (#3457)
312592d578 is described below

commit 312592d5785c7d3cc129131d79a0875152c7ba83
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Oct 13 22:31:12 2022 -0700

    Spark: Optimize snapshot expiry (#3457)
---
 .../java/org/apache/iceberg/AllManifestsTable.java |   9 +-
 .../java/org/apache/iceberg/ReachableFileUtil.java |  18 +++-
 .../iceberg/spark/actions/BaseSparkAction.java     |  36 ++++++--
 .../spark/actions/ExpireSnapshotsSparkAction.java  |  37 ++++++--
 .../spark/actions/TestExpireSnapshotsAction.java   | 102 +++++++++++++++++++++
 .../org/apache/iceberg/spark/data/TestHelpers.java |   9 ++
 .../iceberg/spark/actions/BaseSparkAction.java     |  36 ++++++--
 .../spark/actions/ExpireSnapshotsSparkAction.java  |  37 ++++++--
 .../spark/actions/TestExpireSnapshotsAction.java   | 102 +++++++++++++++++++++
 .../org/apache/iceberg/spark/data/TestHelpers.java |   9 ++
 10 files changed, 362 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java 
b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 395efd0609..3f4b7e9b25 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -50,7 +50,9 @@ import org.apache.iceberg.util.StructProjection;
  * <p>This table may return duplicate rows.
  */
 public class AllManifestsTable extends BaseMetadataTable {
-  private static final int REF_SNAPSHOT_ID = 18;
+  public static final Types.NestedField REF_SNAPSHOT_ID =
+      Types.NestedField.required(18, "reference_snapshot_id", 
Types.LongType.get());
+
   private static final Schema MANIFEST_FILE_SCHEMA =
       new Schema(
           Types.NestedField.required(14, "content", Types.IntegerType.get()),
@@ -74,8 +76,7 @@ public class AllManifestsTable extends BaseMetadataTable {
                       Types.NestedField.required(11, "contains_nan", 
Types.BooleanType.get()),
                       Types.NestedField.optional(12, "lower_bound", 
Types.StringType.get()),
                       Types.NestedField.optional(13, "upper_bound", 
Types.StringType.get())))),
-          Types.NestedField.required(
-              REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()));
+          REF_SNAPSHOT_ID);
 
   AllManifestsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".all_manifests");
@@ -424,7 +425,7 @@ public class AllManifestsTable extends BaseMetadataTable {
       }
 
       private <T> boolean isSnapshotRef(BoundReference<T> ref) {
-        return ref.fieldId() == REF_SNAPSHOT_ID;
+        return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
       }
     }
   }
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java 
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index fdba8e2957..4a7064fda5 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.TableMetadata.MetadataLogEntry;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.slf4j.Logger;
@@ -100,10 +101,25 @@ public class ReachableFileUtil {
    * Returns locations of manifest lists in a table.
    *
    * @param table table for which manifestList needs to be fetched
-   * @return the location of manifest Lists
+   * @return the location of manifest lists
    */
   public static List<String> manifestListLocations(Table table) {
+    return manifestListLocations(table, null);
+  }
+
+  /**
+   * Returns locations of manifest lists in a table.
+   *
+   * @param table table for which manifestList needs to be fetched
+   * @param snapshotIds ids of snapshots for which manifest lists will be 
returned
+   * @return the location of manifest lists
+   */
+  public static List<String> manifestListLocations(Table table, Set<Long> 
snapshotIds) {
     Iterable<Snapshot> snapshots = table.snapshots();
+    if (snapshotIds != null) {
+      snapshots = Iterables.filter(snapshots, s -> 
snapshotIds.contains(s.snapshotId()));
+    }
+
     List<String> manifestListLocations = Lists.newArrayList();
     for (Snapshot snapshot : snapshots) {
       String manifestListLocation = snapshot.manifestListLocation();
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738..cdd80040fa 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@ import static org.apache.spark.sql.functions.lit;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ abstract class BaseSparkAction<ThisT> {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> 
snapshotIds) {
     Table serializableTable = SerializableTableWithSize.copyOf(table);
     Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(serializableTable);
     int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
 
-    Dataset<ManifestFileBean> allManifests =
-        loadMetadataTable(table, ALL_MANIFESTS)
+    Dataset<ManifestFileBean> manifestBeanDS =
+        manifestDF(table, snapshotIds)
             .selectExpr(
                 "content",
                 "path",
@@ -155,17 +161,35 @@ abstract class BaseSparkAction<ThisT> {
             .repartition(numShufflePartitions) // avoid adaptive execution 
combining tasks
             .as(ManifestFileBean.ENCODER);
 
-    return allManifests.flatMap(new ReadManifest(tableBroadcast), 
FileInfo.ENCODER);
+    return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), 
FileInfo.ENCODER);
   }
 
   protected Dataset<FileInfo> manifestDS(Table table) {
-    return loadMetadataTable(table, ALL_MANIFESTS)
+    return manifestDS(table, null);
+  }
+
+  protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
+    return manifestDF(table, snapshotIds)
         .select(col("path"), lit(MANIFEST).as("type"))
         .as(FileInfo.ENCODER);
   }
 
+  private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
+    Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+    if (snapshotIds != null) {
+      Column filterCond = 
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+      return manifestDF.filter(filterCond);
+    } else {
+      return manifestDF;
+    }
+  }
+
   protected Dataset<FileInfo> manifestListDS(Table table) {
-    List<String> manifestLists = 
ReachableFileUtil.manifestListLocations(table);
+    return manifestListDS(table, null);
+  }
+
+  protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> 
snapshotIds) {
+    List<String> manifestLists = 
ReachableFileUtil.manifestListLocations(table, snapshotIds);
     return toFileInfoDS(manifestLists, MANIFEST_LIST);
   }
 
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367dde..d9af48c221 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
   public Dataset<FileInfo> expireFiles() {
     if (expiredFileDS == null) {
       // fetch metadata before expiration
-      Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+      TableMetadata originalMetadata = ops.current();
 
       // perform expiration
       org.apache.iceberg.ExpireSnapshots expireSnapshots = 
table.expireSnapshots();
@@ -184,11 +186,16 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
 
       expireSnapshots.cleanExpiredFiles(false).commit();
 
-      // fetch metadata after expiration
-      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+      // fetch valid files after expiration
+      TableMetadata updatedMetadata = ops.refresh();
+      Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
+
+      // fetch files referenced by expired snapshots
+      Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, 
updatedMetadata);
+      Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, 
deletedSnapshotIds);
 
       // determine expired files
-      this.expiredFileDS = originalFileDS.except(validFileDS);
+      this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
     }
 
     return expiredFileDS;
@@ -236,11 +243,25 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
     return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, 
STREAM_RESULTS_DEFAULT);
   }
 
-  private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
+  private Dataset<FileInfo> fileDS(TableMetadata metadata) {
+    return fileDS(metadata, null);
+  }
+
+  private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> 
snapshotIds) {
     Table staticTable = newStaticTable(metadata, table.io());
-    return contentFileDS(staticTable)
-        .union(manifestDS(staticTable))
-        .union(manifestListDS(staticTable));
+    return contentFileDS(staticTable, snapshotIds)
+        .union(manifestDS(staticTable, snapshotIds))
+        .union(manifestListDS(staticTable, snapshotIds));
+  }
+
+  private Set<Long> findExpiredSnapshotIds(
+      TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+    Set<Long> retainedSnapshots =
+        
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+    return originalMetadata.snapshots().stream()
+        .map(Snapshot::snapshotId)
+        .filter(id -> !retainedSnapshots.contains(id))
+        .collect(Collectors.toSet());
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb9..8f10538f63 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public class TestExpireSnapshotsAction extends 
SparkTestBase {
     List<Row> untypedExpiredFiles = action.expire().collectAsList();
     Assert.assertEquals("Expired results must match", 1, 
untypedExpiredFiles.size());
   }
+
+  @Test
+  public void testExpireFileDeletionMostExpired() {
+    testExpireFilesAreDeleted(5, 2);
+  }
+
+  @Test
+  public void testExpireFileDeletionMostRetained() {
+    testExpireFilesAreDeleted(2, 5);
+  }
+
+  public void testExpireFilesAreDeleted(int dataFilesExpired, int 
dataFilesRetained) {
+    // Add data files to be expired
+    Set<String> dataFiles = Sets.newHashSet();
+    for (int i = 0; i < dataFilesExpired; i++) {
+      DataFile df =
+          DataFiles.builder(SPEC)
+              .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+              .withFileSizeInBytes(10)
+              .withPartitionPath("c1=1")
+              .withRecordCount(1)
+              .build();
+      dataFiles.add(df.path().toString());
+      table.newFastAppend().appendFile(df).commit();
+    }
+
+    // Delete them all, these will be deleted on expire snapshot
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+    // Clears "DELETED" manifests
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+    Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+    // Add data files to be retained, which are not deleted.
+    for (int i = 0; i < dataFilesRetained; i++) {
+      DataFile df =
+          DataFiles.builder(SPEC)
+              .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+              .withFileSizeInBytes(10)
+              .withPartitionPath("c1=1")
+              .withRecordCount(1)
+              .build();
+      table.newFastAppend().appendFile(df).commit();
+    }
+
+    long end = rightAfterSnapshot();
+
+    Set<String> expectedDeletes = Sets.newHashSet();
+    expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+    // all snapshot manifest lists except current will be deleted
+    expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+    expectedDeletes.addAll(
+        manifestsBefore); // new manifests are reachable from current snapshot 
and not deleted
+    expectedDeletes.addAll(
+        dataFiles); // new data files are reachable from current snapshot and 
not deleted
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    SparkActions.get()
+        .expireSnapshots(table)
+        .expireOlderThan(end)
+        .deleteWith(deletedFiles::add)
+        .execute();
+
+    Assert.assertEquals(
+        "All reachable files before expiration should be deleted", 
expectedDeletes, deletedFiles);
+  }
+
+  @Test
+  public void testExpireSomeCheckFilesDeleted() {
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    long after = rightAfterSnapshot();
+    waitUntilAfter(after);
+
+    table.newAppend().appendFile(FILE_D).commit();
+
+    table.newDelete().deleteFile(FILE_B).commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    SparkActions.get()
+        .expireSnapshots(table)
+        .expireOlderThan(after)
+        .deleteWith(deletedFiles::add)
+        .execute();
+
+    // C, D should be retained (live)
+    // B should be retained (previous snapshot points to it)
+    // A should be deleted
+    Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+  }
 }
diff --git 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4..e5ad0ca213 100644
--- 
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public class TestHelpers {
 
     return deleteFiles;
   }
+
+  public static Set<String> reachableManifestPaths(Table table) {
+    return StreamSupport.stream(table.snapshots().spliterator(), false)
+        .flatMap(s -> s.allManifests(table.io()).stream())
+        .map(ManifestFile::path)
+        .collect(Collectors.toSet());
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738..cdd80040fa 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@ import static org.apache.spark.sql.functions.lit;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ abstract class BaseSparkAction<ThisT> {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> 
snapshotIds) {
     Table serializableTable = SerializableTableWithSize.copyOf(table);
     Broadcast<Table> tableBroadcast = 
sparkContext.broadcast(serializableTable);
     int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
 
-    Dataset<ManifestFileBean> allManifests =
-        loadMetadataTable(table, ALL_MANIFESTS)
+    Dataset<ManifestFileBean> manifestBeanDS =
+        manifestDF(table, snapshotIds)
             .selectExpr(
                 "content",
                 "path",
@@ -155,17 +161,35 @@ abstract class BaseSparkAction<ThisT> {
             .repartition(numShufflePartitions) // avoid adaptive execution 
combining tasks
             .as(ManifestFileBean.ENCODER);
 
-    return allManifests.flatMap(new ReadManifest(tableBroadcast), 
FileInfo.ENCODER);
+    return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast), 
FileInfo.ENCODER);
   }
 
   protected Dataset<FileInfo> manifestDS(Table table) {
-    return loadMetadataTable(table, ALL_MANIFESTS)
+    return manifestDS(table, null);
+  }
+
+  protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
+    return manifestDF(table, snapshotIds)
         .select(col("path"), lit(MANIFEST).as("type"))
         .as(FileInfo.ENCODER);
   }
 
+  private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
+    Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+    if (snapshotIds != null) {
+      Column filterCond = 
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+      return manifestDF.filter(filterCond);
+    } else {
+      return manifestDF;
+    }
+  }
+
   protected Dataset<FileInfo> manifestListDS(Table table) {
-    List<String> manifestLists = 
ReachableFileUtil.manifestListLocations(table);
+    return manifestListDS(table, null);
+  }
+
+  protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> 
snapshotIds) {
+    List<String> manifestLists = 
ReachableFileUtil.manifestListLocations(table, snapshotIds);
     return toFileInfoDS(manifestLists, MANIFEST_LIST);
   }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367dde..d9af48c221 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
   public Dataset<FileInfo> expireFiles() {
     if (expiredFileDS == null) {
       // fetch metadata before expiration
-      Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+      TableMetadata originalMetadata = ops.current();
 
       // perform expiration
       org.apache.iceberg.ExpireSnapshots expireSnapshots = 
table.expireSnapshots();
@@ -184,11 +186,16 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
 
       expireSnapshots.cleanExpiredFiles(false).commit();
 
-      // fetch metadata after expiration
-      Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+      // fetch valid files after expiration
+      TableMetadata updatedMetadata = ops.refresh();
+      Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
+
+      // fetch files referenced by expired snapshots
+      Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata, 
updatedMetadata);
+      Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata, 
deletedSnapshotIds);
 
       // determine expired files
-      this.expiredFileDS = originalFileDS.except(validFileDS);
+      this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
     }
 
     return expiredFileDS;
@@ -236,11 +243,25 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
     return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS, 
STREAM_RESULTS_DEFAULT);
   }
 
-  private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
+  private Dataset<FileInfo> fileDS(TableMetadata metadata) {
+    return fileDS(metadata, null);
+  }
+
+  private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long> 
snapshotIds) {
     Table staticTable = newStaticTable(metadata, table.io());
-    return contentFileDS(staticTable)
-        .union(manifestDS(staticTable))
-        .union(manifestListDS(staticTable));
+    return contentFileDS(staticTable, snapshotIds)
+        .union(manifestDS(staticTable, snapshotIds))
+        .union(manifestListDS(staticTable, snapshotIds));
+  }
+
+  private Set<Long> findExpiredSnapshotIds(
+      TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+    Set<Long> retainedSnapshots =
+        
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+    return originalMetadata.snapshots().stream()
+        .map(Snapshot::snapshotId)
+        .filter(id -> !retainedSnapshots.contains(id))
+        .collect(Collectors.toSet());
   }
 
   private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb9..7004c6f8e0 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public class TestExpireSnapshotsAction extends 
SparkTestBase {
     List<Row> untypedExpiredFiles = action.expire().collectAsList();
     Assert.assertEquals("Expired results must match", 1, 
untypedExpiredFiles.size());
   }
+
+  @Test
+  public void testExpireFileDeletionMostExpired() {
+    textExpireAllCheckFilesDeleted(5, 2);
+  }
+
+  @Test
+  public void testExpireFileDeletionMostRetained() {
+    textExpireAllCheckFilesDeleted(2, 5);
+  }
+
+  public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int 
dataFilesRetained) {
+    // Add data files to be expired
+    Set<String> dataFiles = Sets.newHashSet();
+    for (int i = 0; i < dataFilesExpired; i++) {
+      DataFile df =
+          DataFiles.builder(SPEC)
+              .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+              .withFileSizeInBytes(10)
+              .withPartitionPath("c1=1")
+              .withRecordCount(1)
+              .build();
+      dataFiles.add(df.path().toString());
+      table.newFastAppend().appendFile(df).commit();
+    }
+
+    // Delete them all, these will be deleted on expire snapshot
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+    // Clears "DELETED" manifests
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+    Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+    // Add data files to be retained, which are not deleted.
+    for (int i = 0; i < dataFilesRetained; i++) {
+      DataFile df =
+          DataFiles.builder(SPEC)
+              .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+              .withFileSizeInBytes(10)
+              .withPartitionPath("c1=1")
+              .withRecordCount(1)
+              .build();
+      table.newFastAppend().appendFile(df).commit();
+    }
+
+    long end = rightAfterSnapshot();
+
+    Set<String> expectedDeletes = Sets.newHashSet();
+    expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+    // all snapshot manifest lists except current will be deleted
+    expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+    expectedDeletes.addAll(
+        manifestsBefore); // new manifests are reachable from current snapshot 
and not deleted
+    expectedDeletes.addAll(
+        dataFiles); // new data files are reachable from current snapshot and 
not deleted
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    SparkActions.get()
+        .expireSnapshots(table)
+        .expireOlderThan(end)
+        .deleteWith(deletedFiles::add)
+        .execute();
+
+    Assert.assertEquals(
+        "All reachable files before expiration should be deleted", 
expectedDeletes, deletedFiles);
+  }
+
+  @Test
+  public void testExpireSomeCheckFilesDeleted() {
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    long after = rightAfterSnapshot();
+    waitUntilAfter(after);
+
+    table.newAppend().appendFile(FILE_D).commit();
+
+    table.newDelete().deleteFile(FILE_B).commit();
+
+    Set<String> deletedFiles = Sets.newHashSet();
+    SparkActions.get()
+        .expireSnapshots(table)
+        .expireOlderThan(after)
+        .deleteWith(deletedFiles::add)
+        .execute();
+
+    // C, D should be retained (live)
+    // B should be retained (previous snapshot points to it)
+    // A should be deleted
+    Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+    Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+  }
 }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4..e5ad0ca213 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public class TestHelpers {
 
     return deleteFiles;
   }
+
+  public static Set<String> reachableManifestPaths(Table table) {
+    return StreamSupport.stream(table.snapshots().spliterator(), false)
+        .flatMap(s -> s.allManifests(table.io()).stream())
+        .map(ManifestFile::path)
+        .collect(Collectors.toSet());
+  }
 }

Reply via email to