This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 312592d578 Spark: Optimize snapshot expiry (#3457)
312592d578 is described below
commit 312592d5785c7d3cc129131d79a0875152c7ba83
Author: Szehon Ho <[email protected]>
AuthorDate: Thu Oct 13 22:31:12 2022 -0700
Spark: Optimize snapshot expiry (#3457)
---
.../java/org/apache/iceberg/AllManifestsTable.java | 9 +-
.../java/org/apache/iceberg/ReachableFileUtil.java | 18 +++-
.../iceberg/spark/actions/BaseSparkAction.java | 36 ++++++--
.../spark/actions/ExpireSnapshotsSparkAction.java | 37 ++++++--
.../spark/actions/TestExpireSnapshotsAction.java | 102 +++++++++++++++++++++
.../org/apache/iceberg/spark/data/TestHelpers.java | 9 ++
.../iceberg/spark/actions/BaseSparkAction.java | 36 ++++++--
.../spark/actions/ExpireSnapshotsSparkAction.java | 37 ++++++--
.../spark/actions/TestExpireSnapshotsAction.java | 102 +++++++++++++++++++++
.../org/apache/iceberg/spark/data/TestHelpers.java | 9 ++
10 files changed, 362 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index 395efd0609..3f4b7e9b25 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -50,7 +50,9 @@ import org.apache.iceberg.util.StructProjection;
* <p>This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
- private static final int REF_SNAPSHOT_ID = 18;
+ public static final Types.NestedField REF_SNAPSHOT_ID =
+ Types.NestedField.required(18, "reference_snapshot_id",
Types.LongType.get());
+
private static final Schema MANIFEST_FILE_SCHEMA =
new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
@@ -74,8 +76,7 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan",
Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound",
Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound",
Types.StringType.get())))),
- Types.NestedField.required(
- REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get()));
+ REF_SNAPSHOT_ID);
AllManifestsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".all_manifests");
@@ -424,7 +425,7 @@ public class AllManifestsTable extends BaseMetadataTable {
}
private <T> boolean isSnapshotRef(BoundReference<T> ref) {
- return ref.fieldId() == REF_SNAPSHOT_ID;
+ return ref.fieldId() == REF_SNAPSHOT_ID.fieldId();
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
index fdba8e2957..4a7064fda5 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -100,10 +101,25 @@ public class ReachableFileUtil {
* Returns locations of manifest lists in a table.
*
* @param table table for which manifestList needs to be fetched
- * @return the location of manifest Lists
+ * @return the location of manifest lists
*/
public static List<String> manifestListLocations(Table table) {
+ return manifestListLocations(table, null);
+ }
+
+ /**
+ * Returns locations of manifest lists in a table.
+ *
+ * @param table table for which manifestList needs to be fetched
+ * @param snapshotIds ids of snapshots for which manifest lists will be
returned
+ * @return the location of manifest lists
+ */
+ public static List<String> manifestListLocations(Table table, Set<Long>
snapshotIds) {
Iterable<Snapshot> snapshots = table.snapshots();
+ if (snapshotIds != null) {
+ snapshots = Iterables.filter(snapshots, s ->
snapshotIds.contains(s.snapshotId()));
+ }
+
List<String> manifestListLocations = Lists.newArrayList();
for (Snapshot snapshot : snapshots) {
String manifestListLocation = snapshot.manifestListLocation();
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738..cdd80040fa 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@ import static org.apache.spark.sql.functions.lit;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ abstract class BaseSparkAction<ThisT> {
return new BaseTable(ops, metadataFileLocation);
}
- // builds a DF of delete and data file path and type by reading all manifests
protected Dataset<FileInfo> contentFileDS(Table table) {
+ return contentFileDS(table, null);
+ }
+
+ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long>
snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(serializableTable);
int numShufflePartitions =
spark.sessionState().conf().numShufflePartitions();
- Dataset<ManifestFileBean> allManifests =
- loadMetadataTable(table, ALL_MANIFESTS)
+ Dataset<ManifestFileBean> manifestBeanDS =
+ manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
@@ -155,17 +161,35 @@ abstract class BaseSparkAction<ThisT> {
.repartition(numShufflePartitions) // avoid adaptive execution
combining tasks
.as(ManifestFileBean.ENCODER);
- return allManifests.flatMap(new ReadManifest(tableBroadcast),
FileInfo.ENCODER);
+ return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast),
FileInfo.ENCODER);
}
protected Dataset<FileInfo> manifestDS(Table table) {
- return loadMetadataTable(table, ALL_MANIFESTS)
+ return manifestDS(table, null);
+ }
+
+ protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
+ return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}
+ private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
+ Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+ if (snapshotIds != null) {
+ Column filterCond =
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+ return manifestDF.filter(filterCond);
+ } else {
+ return manifestDF;
+ }
+ }
+
protected Dataset<FileInfo> manifestListDS(Table table) {
- List<String> manifestLists =
ReachableFileUtil.manifestListLocations(table);
+ return manifestListDS(table, null);
+ }
+
+ protected Dataset<FileInfo> manifestListDS(Table table, Set<Long>
snapshotIds) {
+ List<String> manifestLists =
ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367dde..d9af48c221 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
public Dataset<FileInfo> expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
- Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+ TableMetadata originalMetadata = ops.current();
// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots =
table.expireSnapshots();
@@ -184,11 +186,16 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots.cleanExpiredFiles(false).commit();
- // fetch metadata after expiration
- Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+ // fetch valid files after expiration
+ TableMetadata updatedMetadata = ops.refresh();
+ Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
+
+ // fetch files referenced by expired snapshots
+ Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata,
updatedMetadata);
+ Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata,
deletedSnapshotIds);
// determine expired files
- this.expiredFileDS = originalFileDS.except(validFileDS);
+ this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}
return expiredFileDS;
@@ -236,11 +243,25 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS,
STREAM_RESULTS_DEFAULT);
}
- private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
+ private Dataset<FileInfo> fileDS(TableMetadata metadata) {
+ return fileDS(metadata, null);
+ }
+
+ private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long>
snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
- return contentFileDS(staticTable)
- .union(manifestDS(staticTable))
- .union(manifestListDS(staticTable));
+ return contentFileDS(staticTable, snapshotIds)
+ .union(manifestDS(staticTable, snapshotIds))
+ .union(manifestListDS(staticTable, snapshotIds));
+ }
+
+ private Set<Long> findExpiredSnapshotIds(
+ TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+ Set<Long> retainedSnapshots =
+
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return originalMetadata.snapshots().stream()
+ .map(Snapshot::snapshotId)
+ .filter(id -> !retainedSnapshots.contains(id))
+ .collect(Collectors.toSet());
}
private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb9..8f10538f63 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
List<Row> untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1,
untypedExpiredFiles.size());
}
+
+ @Test
+ public void testExpireFileDeletionMostExpired() {
+ testExpireFilesAreDeleted(5, 2);
+ }
+
+ @Test
+ public void testExpireFileDeletionMostRetained() {
+ testExpireFilesAreDeleted(2, 5);
+ }
+
+ public void testExpireFilesAreDeleted(int dataFilesExpired, int
dataFilesRetained) {
+ // Add data files to be expired
+ Set<String> dataFiles = Sets.newHashSet();
+ for (int i = 0; i < dataFilesExpired; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ dataFiles.add(df.path().toString());
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ // Delete them all, these will be deleted on expire snapshot
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+ // Clears "DELETED" manifests
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+ Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+ // Add data files to be retained, which are not deleted.
+ for (int i = 0; i < dataFilesRetained; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ long end = rightAfterSnapshot();
+
+ Set<String> expectedDeletes = Sets.newHashSet();
+ expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+ // all snapshot manifest lists except current will be deleted
+ expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+ expectedDeletes.addAll(
+ manifestsBefore); // new manifests are reachable from current snapshot
and not deleted
+ expectedDeletes.addAll(
+ dataFiles); // new data files are reachable from current snapshot and
not deleted
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(end)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ Assert.assertEquals(
+ "All reachable files before expiration should be deleted",
expectedDeletes, deletedFiles);
+ }
+
+ @Test
+ public void testExpireSomeCheckFilesDeleted() {
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+ waitUntilAfter(after);
+
+ table.newAppend().appendFile(FILE_D).commit();
+
+ table.newDelete().deleteFile(FILE_B).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ // C, D should be retained (live)
+ // B should be retained (previous snapshot points to it)
+ // A should be deleted
+ Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+ }
}
diff --git
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4..e5ad0ca213 100644
---
a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public class TestHelpers {
return deleteFiles;
}
+
+ public static Set<String> reachableManifestPaths(Table table) {
+ return StreamSupport.stream(table.snapshots().spliterator(), false)
+ .flatMap(s -> s.allManifests(table.io()).stream())
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 74a0e70738..cdd80040fa 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -25,11 +25,13 @@ import static org.apache.spark.sql.functions.lit;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
@@ -61,6 +63,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -137,14 +140,17 @@ abstract class BaseSparkAction<ThisT> {
return new BaseTable(ops, metadataFileLocation);
}
- // builds a DF of delete and data file path and type by reading all manifests
protected Dataset<FileInfo> contentFileDS(Table table) {
+ return contentFileDS(table, null);
+ }
+
+ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long>
snapshotIds) {
Table serializableTable = SerializableTableWithSize.copyOf(table);
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(serializableTable);
int numShufflePartitions =
spark.sessionState().conf().numShufflePartitions();
- Dataset<ManifestFileBean> allManifests =
- loadMetadataTable(table, ALL_MANIFESTS)
+ Dataset<ManifestFileBean> manifestBeanDS =
+ manifestDF(table, snapshotIds)
.selectExpr(
"content",
"path",
@@ -155,17 +161,35 @@ abstract class BaseSparkAction<ThisT> {
.repartition(numShufflePartitions) // avoid adaptive execution
combining tasks
.as(ManifestFileBean.ENCODER);
- return allManifests.flatMap(new ReadManifest(tableBroadcast),
FileInfo.ENCODER);
+ return manifestBeanDS.flatMap(new ReadManifest(tableBroadcast),
FileInfo.ENCODER);
}
protected Dataset<FileInfo> manifestDS(Table table) {
- return loadMetadataTable(table, ALL_MANIFESTS)
+ return manifestDS(table, null);
+ }
+
+ protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshotIds) {
+ return manifestDF(table, snapshotIds)
.select(col("path"), lit(MANIFEST).as("type"))
.as(FileInfo.ENCODER);
}
+ private Dataset<Row> manifestDF(Table table, Set<Long> snapshotIds) {
+ Dataset<Row> manifestDF = loadMetadataTable(table, ALL_MANIFESTS);
+ if (snapshotIds != null) {
+ Column filterCond =
col(AllManifestsTable.REF_SNAPSHOT_ID.name()).isInCollection(snapshotIds);
+ return manifestDF.filter(filterCond);
+ } else {
+ return manifestDF;
+ }
+ }
+
protected Dataset<FileInfo> manifestListDS(Table table) {
- List<String> manifestLists =
ReachableFileUtil.manifestListLocations(table);
+ return manifestListDS(table, null);
+ }
+
+ protected Dataset<FileInfo> manifestListDS(Table table, Set<Long>
snapshotIds) {
+ List<String> manifestLists =
ReachableFileUtil.manifestListLocations(table, snapshotIds);
return toFileInfoDS(manifestLists, MANIFEST_LIST);
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index b47f367dde..d9af48c221 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -26,7 +26,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -165,7 +167,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
public Dataset<FileInfo> expireFiles() {
if (expiredFileDS == null) {
// fetch metadata before expiration
- Dataset<FileInfo> originalFileDS = validFileDS(ops.current());
+ TableMetadata originalMetadata = ops.current();
// perform expiration
org.apache.iceberg.ExpireSnapshots expireSnapshots =
table.expireSnapshots();
@@ -184,11 +186,16 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots.cleanExpiredFiles(false).commit();
- // fetch metadata after expiration
- Dataset<FileInfo> validFileDS = validFileDS(ops.refresh());
+ // fetch valid files after expiration
+ TableMetadata updatedMetadata = ops.refresh();
+ Dataset<FileInfo> validFileDS = fileDS(updatedMetadata);
+
+ // fetch files referenced by expired snapshots
+ Set<Long> deletedSnapshotIds = findExpiredSnapshotIds(originalMetadata,
updatedMetadata);
+ Dataset<FileInfo> deleteCandidateFileDS = fileDS(originalMetadata,
deletedSnapshotIds);
// determine expired files
- this.expiredFileDS = originalFileDS.except(validFileDS);
+ this.expiredFileDS = deleteCandidateFileDS.except(validFileDS);
}
return expiredFileDS;
@@ -236,11 +243,25 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
return PropertyUtil.propertyAsBoolean(options(), STREAM_RESULTS,
STREAM_RESULTS_DEFAULT);
}
- private Dataset<FileInfo> validFileDS(TableMetadata metadata) {
+ private Dataset<FileInfo> fileDS(TableMetadata metadata) {
+ return fileDS(metadata, null);
+ }
+
+ private Dataset<FileInfo> fileDS(TableMetadata metadata, Set<Long>
snapshotIds) {
Table staticTable = newStaticTable(metadata, table.io());
- return contentFileDS(staticTable)
- .union(manifestDS(staticTable))
- .union(manifestListDS(staticTable));
+ return contentFileDS(staticTable, snapshotIds)
+ .union(manifestDS(staticTable, snapshotIds))
+ .union(manifestListDS(staticTable, snapshotIds));
+ }
+
+ private Set<Long> findExpiredSnapshotIds(
+ TableMetadata originalMetadata, TableMetadata updatedMetadata) {
+ Set<Long> retainedSnapshots =
+
updatedMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ return originalMetadata.snapshots().stream()
+ .map(Snapshot::snapshotId)
+ .filter(id -> !retainedSnapshots.contains(id))
+ .collect(Collectors.toSet());
}
private ExpireSnapshots.Result deleteFiles(Iterator<FileInfo> files) {
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index b2421abfb9..7004c6f8e0 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -53,6 +54,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -1250,4 +1252,104 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
List<Row> untypedExpiredFiles = action.expire().collectAsList();
Assert.assertEquals("Expired results must match", 1,
untypedExpiredFiles.size());
}
+
+ @Test
+ public void testExpireFileDeletionMostExpired() {
+ textExpireAllCheckFilesDeleted(5, 2);
+ }
+
+ @Test
+ public void testExpireFileDeletionMostRetained() {
+ textExpireAllCheckFilesDeleted(2, 5);
+ }
+
+ public void textExpireAllCheckFilesDeleted(int dataFilesExpired, int
dataFilesRetained) {
+ // Add data files to be expired
+ Set<String> dataFiles = Sets.newHashSet();
+ for (int i = 0; i < dataFilesExpired; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-expired-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ dataFiles.add(df.path().toString());
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ // Delete them all, these will be deleted on expire snapshot
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+ // Clears "DELETED" manifests
+ table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+ Set<String> manifestsBefore = TestHelpers.reachableManifestPaths(table);
+
+ // Add data files to be retained, which are not deleted.
+ for (int i = 0; i < dataFilesRetained; i++) {
+ DataFile df =
+ DataFiles.builder(SPEC)
+ .withPath(String.format("/path/to/data-retained-%d.parquet", i))
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1")
+ .withRecordCount(1)
+ .build();
+ table.newFastAppend().appendFile(df).commit();
+ }
+
+ long end = rightAfterSnapshot();
+
+ Set<String> expectedDeletes = Sets.newHashSet();
+ expectedDeletes.addAll(ReachableFileUtil.manifestListLocations(table));
+ // all snapshot manifest lists except current will be deleted
+ expectedDeletes.remove(table.currentSnapshot().manifestListLocation());
+ expectedDeletes.addAll(
+ manifestsBefore); // new manifests are reachable from current snapshot
and not deleted
+ expectedDeletes.addAll(
+ dataFiles); // new data files are reachable from current snapshot and
not deleted
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(end)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ Assert.assertEquals(
+ "All reachable files before expiration should be deleted",
expectedDeletes, deletedFiles);
+ }
+
+ @Test
+ public void testExpireSomeCheckFilesDeleted() {
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+ waitUntilAfter(after);
+
+ table.newAppend().appendFile(FILE_D).commit();
+
+ table.newDelete().deleteFile(FILE_B).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+ // C, D should be retained (live)
+ // B should be retained (previous snapshot points to it)
+ // A should be deleted
+ Assert.assertTrue(deletedFiles.contains(FILE_A.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_B.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_C.path().toString()));
+ Assert.assertFalse(deletedFiles.contains(FILE_D.path().toString()));
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 69b14eead4..e5ad0ca213 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -37,6 +37,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
@@ -808,4 +810,11 @@ public class TestHelpers {
return deleteFiles;
}
+
+ public static Set<String> reachableManifestPaths(Table table) {
+ return StreamSupport.stream(table.snapshots().spliterator(), false)
+ .flatMap(s -> s.allManifests(table.io()).stream())
+ .map(ManifestFile::path)
+ .collect(Collectors.toSet());
+ }
}