This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a048825fc Spark, Flink: replace deprecated cleanExpiredFiles in 
expireSnapshots (#14832)
9a048825fc is described below

commit 9a048825fc8d83982ca6d4ea27c07c1c573cd8a6
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Fri Dec 12 15:07:03 2025 -0800

    Spark, Flink: replace deprecated cleanExpiredFiles in expireSnapshots 
(#14832)
---
 core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java        | 3 ++-
 .../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java   | 2 +-
 .../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java   | 2 +-
 .../iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java   | 2 +-
 .../org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java   | 3 ++-
 .../org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java   | 3 ++-
 6 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java 
b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index c2fd24856f..5d20bc15a9 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -38,6 +38,7 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.FilesTable;
@@ -1650,7 +1651,7 @@ public abstract class CatalogTests<C extends Catalog & 
SupportsNamespaces> {
     table.newAppend().appendFile(anotherFile).commit();
     table
         .expireSnapshots()
-        .cleanExpiredFiles(false)
+        .cleanupLevel(ExpireSnapshots.CleanupLevel.NONE)
         .expireOlderThan(table.currentSnapshot().timestampMillis())
         .cleanExpiredMetadata(true)
         .commit();
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 154512e27b..36c8215755 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
                 ctx.output(DELETE_STREAM, file);
                 deleteFileCounter.incrementAndGet();
               })
-          .cleanExpiredFiles(true)
+          .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
           .commit();
 
       LOG.info(
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 2db9585ebd..8cbd8e269a 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
                 ctx.output(DELETE_STREAM, file);
                 deleteFileCounter.incrementAndGet();
               })
-          .cleanExpiredFiles(true)
+          .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
           .commit();
 
       LOG.info(
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 2db9585ebd..8cbd8e269a 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -107,7 +107,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
                 ctx.output(DELETE_STREAM, file);
                 deleteFileCounter.incrementAndGet();
               })
-          .cleanExpiredFiles(true)
+          .cleanupLevel(ExpireSnapshots.CleanupLevel.ALL)
           .commit();
 
       LOG.info(
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 5f4d4ec151..e49e732673 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ExpireSnapshots.CleanupLevel;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -169,7 +170,7 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
         expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
       }
 
-      expireSnapshots.cleanExpiredFiles(false).commit();
+      expireSnapshots.cleanupLevel(CleanupLevel.NONE).commit();
 
       // fetch valid files after expiration
       TableMetadata updatedMetadata = ops.refresh();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
index 5f4d4ec151..e49e732673 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/ExpireSnapshotsSparkAction.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import org.apache.iceberg.ExpireSnapshots.CleanupLevel;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -169,7 +170,7 @@ public class ExpireSnapshotsSparkAction extends 
BaseSparkAction<ExpireSnapshotsS
         expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
       }
 
-      expireSnapshots.cleanExpiredFiles(false).commit();
+      expireSnapshots.cleanupLevel(CleanupLevel.NONE).commit();
 
       // fetch valid files after expiration
       TableMetadata updatedMetadata = ops.refresh();

Reply via email to