This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4920a0ca28 Spark 3.4, 3.5: Expose cleanExpiredMetadata in
expire_snapshots Spark procedure (#13553)
4920a0ca28 is described below
commit 4920a0ca28f7bfa9113b9c4abcb54c402504e7f3
Author: gaborkaszab <[email protected]>
AuthorDate: Mon Jul 14 19:31:50 2025 +0200
Spark 3.4, 3.5: Expose cleanExpiredMetadata in expire_snapshots Spark
procedure (#13553)
---
.../extensions/TestExpireSnapshotsProcedure.java | 60 +++++++++++++++++++++
.../spark/actions/ExpireSnapshotsSparkAction.java | 11 +++-
.../spark/procedures/ExpireSnapshotsProcedure.java | 8 ++-
.../spark/actions/TestExpireSnapshotsAction.java | 63 ++++++++++++++++++++++
.../extensions/TestExpireSnapshotsProcedure.java | 60 +++++++++++++++++++++
.../spark/actions/ExpireSnapshotsSparkAction.java | 11 +++-
.../spark/procedures/ExpireSnapshotsProcedure.java | 8 ++-
.../spark/actions/TestExpireSnapshotsAction.java | 63 ++++++++++++++++++++++
8 files changed, 280 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 1560abf112..f5060ba8e7 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -540,6 +540,66 @@ public class TestExpireSnapshotsProcedure extends
ExtensionsTestBase {
.exists();
}
+ @TestTemplate
+ public void testNoExpiredMetadataCleanupByDefault() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("ALTER TABLE %s ADD COLUMN extra_col int", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b', 21)", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThat(table.snapshots()).as("Should be 2 snapshots").hasSize(2);
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',
table => '%s')",
+ catalogName,
+ Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())),
+ tableIdent);
+
+ table.refresh();
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
+ }
+
+ @TestTemplate
+ public void testCleanExpiredMetadata() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("ALTER TABLE %s ADD COLUMN extra_col int", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b', 21)", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThat(table.snapshots()).as("Should be 2 snapshots").hasSize(2);
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots("
+ + "older_than => TIMESTAMP '%s', "
+ + "clean_expired_metadata => true, "
+ + "table => '%s')",
+ catalogName,
+ Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())),
+ tableIdent);
+
+ table.refresh();
+
+ assertThat(table.schemas().keySet())
+ .as("Should have only the latest schema")
+ .containsExactly(table.schema().schemaId());
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
+ }
+
private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
throws IOException {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 2468497e42..8c4fda5b48 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -79,6 +79,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
private Dataset<FileInfo> expiredFileDS = null;
+ private boolean cleanExpiredMetadata = false;
ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
super(spark);
@@ -129,6 +130,12 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
return this;
}
+ @Override
+ public ExpireSnapshotsSparkAction cleanExpiredMetadata(boolean clean) {
+ this.cleanExpiredMetadata = clean;
+ return this;
+ }
+
/**
* Expires snapshots and commits the changes to the table, returning a
Dataset of files to delete.
*
@@ -158,7 +165,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots = expireSnapshots.retainLast(retainLastValue);
}
- expireSnapshots.cleanExpiredFiles(false).commit();
+
expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata).cleanExpiredFiles(false).commit();
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
@@ -202,6 +209,8 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
}
}
+ options.add("clean_expired_metadata=" + cleanExpiredMetadata);
+
return String.format("Expiring snapshots (%s) in %s",
COMMA_JOINER.join(options), table.name());
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index b84d69ea9c..9139b465b1 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -53,7 +53,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
- ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType))
+ ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType)),
+ ProcedureParameter.optional("clean_expired_metadata",
DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -104,6 +105,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
+ Boolean cleanExpiredMetadata = args.isNullAt(6) ? null :
args.getBoolean(6);
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -148,6 +150,10 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
ExpireSnapshotsSparkAction.STREAM_RESULTS,
Boolean.toString(streamResult));
}
+ if (cleanExpiredMetadata != null) {
+ action.cleanExpiredMetadata(cleanExpiredMetadata);
+ }
+
ExpireSnapshots.Result result = action.execute();
return toOutputRows(result);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index d43d4dc05a..d9df44774b 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1343,4 +1343,67 @@ public class TestExpireSnapshotsAction extends TestBase {
.contains(FILE_A.location())
.doesNotContain(FILE_B.location(), FILE_C.location(),
FILE_D.location());
}
+
+ @TestTemplate
+ public void testNoExpiredMetadataCleanupByDefault() {
+ table.newAppend().appendFile(FILE_A).commit();
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+
+ table.updateSchema().addColumn("extra_col",
Types.IntegerType.get()).commit();
+ table.newAppend().appendFile(FILE_B).commit();
+
+ Set<Integer> schemaIds = table.schemas().keySet();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+
assertThat(table.schemas().keySet()).containsExactlyInAnyOrderElementsOf(schemaIds);
+
assertThat(deletedFiles).contains(FILE_A.location()).doesNotContain(FILE_B.location());
+ }
+
+ @TestTemplate
+ public void testCleanExpiredMetadata() {
+ table.newAppend().appendFile(FILE_A).commit();
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+
+ table.updateSchema().addColumn("extra_col",
Types.IntegerType.get()).commit();
+ table.updateSpec().addField("extra_col").commit();
+
+ DataFile fileInNewSpec =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data-in-new-spec.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1/extra_col=11")
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend().appendFile(fileInNewSpec).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .cleanExpiredMetadata(true)
+ .execute();
+
+ assertThat(table.specs().keySet())
+ .as("Should have only the latest spec")
+ .containsExactly(table.spec().specId());
+ assertThat(table.schemas().keySet())
+ .as("Should have only the latest schema")
+ .containsExactly(table.schema().schemaId());
+ assertThat(deletedFiles)
+ .as("Should remove the file from first snapshot")
+ .contains(FILE_A.location())
+ .doesNotContain(fileInNewSpec.location());
+ }
}
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 1560abf112..f5060ba8e7 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -540,6 +540,66 @@ public class TestExpireSnapshotsProcedure extends
ExtensionsTestBase {
.exists();
}
+ @TestTemplate
+ public void testNoExpiredMetadataCleanupByDefault() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("ALTER TABLE %s ADD COLUMN extra_col int", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b', 21)", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThat(table.snapshots()).as("Should be 2 snapshots").hasSize(2);
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',
table => '%s')",
+ catalogName,
+ Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())),
+ tableIdent);
+
+ table.refresh();
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
+ }
+
+ @TestTemplate
+ public void testCleanExpiredMetadata() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("ALTER TABLE %s ADD COLUMN extra_col int", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b', 21)", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ assertThat(table.snapshots()).as("Should be 2 snapshots").hasSize(2);
+ assertThat(table.schemas()).as("Should have 2 schemas").hasSize(2);
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.expire_snapshots("
+ + "older_than => TIMESTAMP '%s', "
+ + "clean_expired_metadata => true, "
+ + "table => '%s')",
+ catalogName,
+ Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis())),
+ tableIdent);
+
+ table.refresh();
+
+ assertThat(table.schemas().keySet())
+ .as("Should have only the latest schema")
+ .containsExactly(table.schema().schemaId());
+ assertEquals(
+ "Procedure output must match", ImmutableList.of(row(0L, 0L, 0L, 0L,
1L, 0L)), output);
+ }
+
private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation,
FileIO fileIO)
throws IOException {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 2468497e42..8c4fda5b48 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -79,6 +79,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
private Dataset<FileInfo> expiredFileDS = null;
+ private boolean cleanExpiredMetadata = false;
ExpireSnapshotsSparkAction(SparkSession spark, Table table) {
super(spark);
@@ -129,6 +130,12 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
return this;
}
+ @Override
+ public ExpireSnapshotsSparkAction cleanExpiredMetadata(boolean clean) {
+ this.cleanExpiredMetadata = clean;
+ return this;
+ }
+
/**
* Expires snapshots and commits the changes to the table, returning a
Dataset of files to delete.
*
@@ -158,7 +165,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots = expireSnapshots.retainLast(retainLastValue);
}
- expireSnapshots.cleanExpiredFiles(false).commit();
+
expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata).cleanExpiredFiles(false).commit();
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
@@ -202,6 +209,8 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
}
}
+ options.add("clean_expired_metadata=" + cleanExpiredMetadata);
+
return String.format("Expiring snapshots (%s) in %s",
COMMA_JOINER.join(options), table.name());
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index b84d69ea9c..9139b465b1 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -53,7 +53,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
ProcedureParameter.optional("stream_results", DataTypes.BooleanType),
- ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType))
+ ProcedureParameter.optional("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType)),
+ ProcedureParameter.optional("clean_expired_metadata",
DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE =
@@ -104,6 +105,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
+ Boolean cleanExpiredMetadata = args.isNullAt(6) ? null :
args.getBoolean(6);
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
@@ -148,6 +150,10 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
ExpireSnapshotsSparkAction.STREAM_RESULTS,
Boolean.toString(streamResult));
}
+ if (cleanExpiredMetadata != null) {
+ action.cleanExpiredMetadata(cleanExpiredMetadata);
+ }
+
ExpireSnapshots.Result result = action.execute();
return toOutputRows(result);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 7aa569041d..0668f9ffb9 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -1342,4 +1342,67 @@ public class TestExpireSnapshotsAction extends TestBase {
.contains(FILE_A.location())
.doesNotContain(FILE_B.location(), FILE_C.location(),
FILE_D.location());
}
+
+ @TestTemplate
+ public void testNoExpiredMetadataCleanupByDefault() {
+ table.newAppend().appendFile(FILE_A).commit();
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+
+ table.updateSchema().addColumn("extra_col",
Types.IntegerType.get()).commit();
+ table.newAppend().appendFile(FILE_B).commit();
+
+ Set<Integer> schemaIds = table.schemas().keySet();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .execute();
+
+
assertThat(table.schemas().keySet()).containsExactlyInAnyOrderElementsOf(schemaIds);
+
assertThat(deletedFiles).contains(FILE_A.location()).doesNotContain(FILE_B.location());
+ }
+
+ @TestTemplate
+ public void testCleanExpiredMetadata() {
+ table.newAppend().appendFile(FILE_A).commit();
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ long after = rightAfterSnapshot();
+
+ table.updateSchema().addColumn("extra_col",
Types.IntegerType.get()).commit();
+ table.updateSpec().addField("extra_col").commit();
+
+ DataFile fileInNewSpec =
+ DataFiles.builder(table.spec())
+ .withPath("/path/to/data-in-new-spec.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1/extra_col=11")
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend().appendFile(fileInNewSpec).commit();
+
+ Set<String> deletedFiles = Sets.newHashSet();
+ SparkActions.get()
+ .expireSnapshots(table)
+ .expireOlderThan(after)
+ .deleteWith(deletedFiles::add)
+ .cleanExpiredMetadata(true)
+ .execute();
+
+ assertThat(table.specs().keySet())
+ .as("Should have only the latest spec")
+ .containsExactly(table.spec().specId());
+ assertThat(table.schemas().keySet())
+ .as("Should have only the latest schema")
+ .containsExactly(table.schema().schemaId());
+ assertThat(deletedFiles)
+ .as("Should remove the file from first snapshot")
+ .contains(FILE_A.location())
+ .doesNotContain(fileInNewSpec.location());
+ }
}