This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new be55a5d4e7 Flink: Backport expose cleanExpiredMetadata for snapshot 
expiration (#13729)
be55a5d4e7 is described below

commit be55a5d4e73e71aac5bcc1bdd14568895ab72e86
Author: gaborkaszab <gaborkas...@gmail.com>
AuthorDate: Mon Aug 4 18:16:30 2025 +0200

    Flink: Backport expose cleanExpiredMetadata for snapshot expiration (#13729)
    
    Backports #13569
---
 .../flink/maintenance/api/ExpireSnapshots.java     | 16 +++++++-
 .../operator/ExpireSnapshotsProcessor.java         | 11 +++++-
 .../maintenance/operator/OperatorTestBase.java     |  6 +++
 .../operator/TestExpireSnapshotsProcessor.java     | 44 +++++++++++++++++++++-
 .../flink/maintenance/api/ExpireSnapshots.java     | 16 +++++++-
 .../operator/ExpireSnapshotsProcessor.java         | 11 +++++-
 .../maintenance/operator/OperatorTestBase.java     |  6 +++
 .../operator/TestExpireSnapshotsProcessor.java     | 44 +++++++++++++++++++++-
 8 files changed, 146 insertions(+), 8 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index f49bc953ec..628a911414 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -46,6 +46,7 @@ public class ExpireSnapshots {
     private Integer numSnapshots = null;
     private Integer planningWorkerPoolSize;
     private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT;
+    private Boolean cleanExpiredMetadata = null;
 
     @Override
     String maintenanceTaskName() {
@@ -94,6 +95,18 @@ public class ExpireSnapshots {
       return this;
     }
 
+    /**
+     * Expires unused table metadata such as partition specs and schemas.
+     *
+     * @param newCleanExpiredMetadata remove unused partition specs, schemas, 
or other metadata when
+     *     true
+     * @return this for method chaining
+     */
+    public Builder cleanExpiredMetadata(boolean newCleanExpiredMetadata) {
+      this.cleanExpiredMetadata = newCleanExpiredMetadata;
+      return this;
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
@@ -105,7 +118,8 @@ public class ExpireSnapshots {
                       tableLoader(),
                       maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
                       numSnapshots,
-                      planningWorkerPoolSize))
+                      planningWorkerPoolSize,
+                      cleanExpiredMetadata))
               .name(operatorName(EXECUTOR_OPERATOR_NAME))
               .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 098d32e3b6..154512e27b 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -52,6 +52,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
   private final Long maxSnapshotAgeMs;
   private final Integer numSnapshots;
   private final Integer plannerPoolSize;
+  private final Boolean cleanExpiredMetadata;
   private transient ExecutorService plannerPool;
   private transient Table table;
 
@@ -59,13 +60,15 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
       TableLoader tableLoader,
       Long maxSnapshotAgeMs,
       Integer numSnapshots,
-      Integer plannerPoolSize) {
-    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+      Integer plannerPoolSize,
+      Boolean cleanExpiredMetadata) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should not be null");
 
     this.tableLoader = tableLoader;
     this.maxSnapshotAgeMs = maxSnapshotAgeMs;
     this.numSnapshots = numSnapshots;
     this.plannerPoolSize = plannerPoolSize;
+    this.cleanExpiredMetadata = cleanExpiredMetadata;
   }
 
   @Override
@@ -92,6 +95,10 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
         expireSnapshots = expireSnapshots.retainLast(numSnapshots);
       }
 
+      if (cleanExpiredMetadata != null) {
+        expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
+      }
+
       AtomicLong deleteFileCounter = new AtomicLong(0L);
       expireSnapshots
           .planWith(plannerPool)
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index b1a2b710fa..3f333fffbd 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -165,6 +165,12 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insert(Table table, Integer id, String data, String extra) 
throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data, extra)));
+    table.refresh();
+  }
+
   /**
    * For the same identifier column id this methods simulate the following row 
operations: <tr>
    * <li>add an equality delete on oldData
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
index d312fc312c..f073272a70 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.TaskResult;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -46,7 +47,7 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase {
     Queue<StreamRecord<String>> deletes;
     try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness =
         ProcessFunctionTestHarnesses.forProcessFunction(
-            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10))) {
+            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, false))) {
       testHarness.open();
 
       if (!success) {
@@ -77,4 +78,45 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase {
       assertThat(deletes).isNull();
     }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCleanExpiredMetadata(boolean cleanExpiredMetadata) throws Exception 
{
+    Table table = createTable();
+    insert(table, 1, "a");
+    table.updateSchema().addColumn("extra", Types.StringType.get()).commit();
+    insert(table, 2, "b", "x");
+
+    assertThat(table.schemas()).hasSize(2);
+
+    List<TaskResult> actual;
+    Queue<StreamRecord<String>> deletes;
+    try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, 
cleanExpiredMetadata))) {
+      testHarness.open();
+
+      testHarness.processElement(Trigger.create(10, 11), 
System.currentTimeMillis());
+      deletes = 
testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM);
+      actual = testHarness.extractOutputValues();
+    }
+
+    assertThat(actual).hasSize(1);
+    TaskResult result = actual.get(0);
+    assertThat(result.startEpoch()).isEqualTo(10);
+    assertThat(result.taskIndex()).isEqualTo(11);
+    assertThat(result.success()).isEqualTo(true);
+    assertThat(result.exceptions()).isNotNull().isEmpty();
+
+    table.refresh();
+    Set<Snapshot> snapshots = Sets.newHashSet(table.snapshots());
+    assertThat(snapshots).hasSize(1);
+    assertThat(deletes).hasSize(1);
+
+    if (cleanExpiredMetadata) {
+      assertThat(table.schemas().values()).containsExactly(table.schema());
+    } else {
+      assertThat(table.schemas()).hasSize(2);
+    }
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index f49bc953ec..628a911414 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -46,6 +46,7 @@ public class ExpireSnapshots {
     private Integer numSnapshots = null;
     private Integer planningWorkerPoolSize;
     private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT;
+    private Boolean cleanExpiredMetadata = null;
 
     @Override
     String maintenanceTaskName() {
@@ -94,6 +95,18 @@ public class ExpireSnapshots {
       return this;
     }
 
+    /**
+     * Expires unused table metadata such as partition specs and schemas.
+     *
+     * @param newCleanExpiredMetadata remove unused partition specs, schemas, 
or other metadata when
+     *     true
+     * @return this for method chaining
+     */
+    public Builder cleanExpiredMetadata(boolean newCleanExpiredMetadata) {
+      this.cleanExpiredMetadata = newCleanExpiredMetadata;
+      return this;
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
@@ -105,7 +118,8 @@ public class ExpireSnapshots {
                       tableLoader(),
                       maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
                       numSnapshots,
-                      planningWorkerPoolSize))
+                      planningWorkerPoolSize,
+                      cleanExpiredMetadata))
               .name(operatorName(EXECUTOR_OPERATOR_NAME))
               .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
               .slotSharingGroup(slotSharingGroup())
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
index 098d32e3b6..154512e27b 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java
@@ -52,6 +52,7 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
   private final Long maxSnapshotAgeMs;
   private final Integer numSnapshots;
   private final Integer plannerPoolSize;
+  private final Boolean cleanExpiredMetadata;
   private transient ExecutorService plannerPool;
   private transient Table table;
 
@@ -59,13 +60,15 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
       TableLoader tableLoader,
       Long maxSnapshotAgeMs,
       Integer numSnapshots,
-      Integer plannerPoolSize) {
-    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
+      Integer plannerPoolSize,
+      Boolean cleanExpiredMetadata) {
+    Preconditions.checkNotNull(tableLoader, "Table loader should not be null");
 
     this.tableLoader = tableLoader;
     this.maxSnapshotAgeMs = maxSnapshotAgeMs;
     this.numSnapshots = numSnapshots;
     this.plannerPoolSize = plannerPoolSize;
+    this.cleanExpiredMetadata = cleanExpiredMetadata;
   }
 
   @Override
@@ -92,6 +95,10 @@ public class ExpireSnapshotsProcessor extends 
ProcessFunction<Trigger, TaskResul
         expireSnapshots = expireSnapshots.retainLast(numSnapshots);
       }
 
+      if (cleanExpiredMetadata != null) {
+        expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
+      }
+
       AtomicLong deleteFileCounter = new AtomicLong(0L);
       expireSnapshots
           .planWith(plannerPool)
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index b1a2b710fa..3f333fffbd 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -165,6 +165,12 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insert(Table table, Integer id, String data, String extra) 
throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data, extra)));
+    table.refresh();
+  }
+
   /**
    * For the same identifier column id this methods simulate the following row 
operations: <tr>
    * <li>add an equality delete on oldData
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
index d312fc312c..f073272a70 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.maintenance.api.TaskResult;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -46,7 +47,7 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase {
     Queue<StreamRecord<String>> deletes;
     try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness =
         ProcessFunctionTestHarnesses.forProcessFunction(
-            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10))) {
+            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, false))) {
       testHarness.open();
 
       if (!success) {
@@ -77,4 +78,45 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase {
       assertThat(deletes).isNull();
     }
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCleanExpiredMetadata(boolean cleanExpiredMetadata) throws Exception 
{
+    Table table = createTable();
+    insert(table, 1, "a");
+    table.updateSchema().addColumn("extra", Types.StringType.get()).commit();
+    insert(table, 2, "b", "x");
+
+    assertThat(table.schemas()).hasSize(2);
+
+    List<TaskResult> actual;
+    Queue<StreamRecord<String>> deletes;
+    try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness =
+        ProcessFunctionTestHarnesses.forProcessFunction(
+            new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, 
cleanExpiredMetadata))) {
+      testHarness.open();
+
+      testHarness.processElement(Trigger.create(10, 11), 
System.currentTimeMillis());
+      deletes = 
testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM);
+      actual = testHarness.extractOutputValues();
+    }
+
+    assertThat(actual).hasSize(1);
+    TaskResult result = actual.get(0);
+    assertThat(result.startEpoch()).isEqualTo(10);
+    assertThat(result.taskIndex()).isEqualTo(11);
+    assertThat(result.success()).isEqualTo(true);
+    assertThat(result.exceptions()).isNotNull().isEmpty();
+
+    table.refresh();
+    Set<Snapshot> snapshots = Sets.newHashSet(table.snapshots());
+    assertThat(snapshots).hasSize(1);
+    assertThat(deletes).hasSize(1);
+
+    if (cleanExpiredMetadata) {
+      assertThat(table.schemas().values()).containsExactly(table.schema());
+    } else {
+      assertThat(table.schemas()).hasSize(2);
+    }
+  }
 }

Reply via email to