This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9a048825fc Spark, Flink: replace deprecated cleanExpiredFiles in
expireSnapshots (#14832)
9a048825fc is described below
commit 9a048825fc8d83982ca6d4ea27c07c1c573cd8a6
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Dec 12 15:07:03 2025 -0800
Spark, Flink: replace deprecated cleanExpiredFiles in expireSnapshots
(#14832)
---
core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java | 3 ++-
.../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java | 2 +-
.../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java | 2 +-
.../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java | 2 +-
.../org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java | 3 ++-
.../org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java | 3 ++-
6 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index c2fd24856f..5d20bc15a9 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FilesTable;
@@ -1650,7 +1651,7 @@ public abstract class CatalogTests<C extends Catalog &
SupportsNamespaces> {
table.newAppend().appendFile(anotherFile).commit();
table
.expireSnapshots()
- .cleanExpiredFiles(false)
+ .cleanupLevel(ExpireSnapshots.CleanupLevel.NONE)
.expireOlderThan(table.currentSnapshot().timestampMillis())
.cleanExpiredMetadata(true)
.commit();
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 154512e27b..36c8215755 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends
ProcessFunction<Trigger, TaskResul
ctx.output(DELETE_STREAM, file);
deleteFileCounter.incrementAndGet();
})
- .cleanExpiredFiles(true)
+ .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
.commit();
LOG.info(
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 2db9585ebd..8cbd8e269a 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends
ProcessFunction<Trigger, TaskResul
ctx.output(DELETE_STREAM, file);
deleteFileCounter.incrementAndGet();
})
- .cleanExpiredFiles(true)
+ .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
.commit();
LOG.info(
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 2db9585ebd..8cbd8e269a 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends
ProcessFunction<Trigger, TaskResul
ctx.output(DELETE_STREAM, file);
deleteFileCounter.incrementAndGet();
})
- .cleanExpiredFiles(true)
+ .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
.commit();
LOG.info(
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 5f4d4ec151..e49e732673 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.apache.iceberg.ExpireSnapshots.CleanupLevel;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -169,7 +170,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
}
- expireSnapshots.cleanExpiredFiles(false).commit();
+ expireSnapshots.cleanupLevel(CleanupLevel.NONE).commit();
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 5f4d4ec151..e49e732673 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.apache.iceberg.ExpireSnapshots.CleanupLevel;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
@@ -169,7 +170,7 @@ public class ExpireSnapshotsSparkAction extends
BaseSparkAction<ExpireSnapshotsS
expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
}
- expireSnapshots.cleanExpiredFiles(false).commit();
+ expireSnapshots.cleanupLevel(CleanupLevel.NONE).commit();
// fetch valid files after expiration
TableMetadata updatedMetadata = ops.refresh();