This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new be55a5d4e7 Flink: Backport expose cleanExpiredMetadata for snapshot expiration (#13729) be55a5d4e7 is described below commit be55a5d4e73e71aac5bcc1bdd14568895ab72e86 Author: gaborkaszab <gaborkas...@gmail.com> AuthorDate: Mon Aug 4 18:16:30 2025 +0200 Flink: Backport expose cleanExpiredMetadata for snapshot expiration (#13729) Backports #13569 --- .../flink/maintenance/api/ExpireSnapshots.java | 16 +++++++- .../operator/ExpireSnapshotsProcessor.java | 11 +++++- .../maintenance/operator/OperatorTestBase.java | 6 +++ .../operator/TestExpireSnapshotsProcessor.java | 44 +++++++++++++++++++++- .../flink/maintenance/api/ExpireSnapshots.java | 16 +++++++- .../operator/ExpireSnapshotsProcessor.java | 11 +++++- .../maintenance/operator/OperatorTestBase.java | 6 +++ .../operator/TestExpireSnapshotsProcessor.java | 44 +++++++++++++++++++++- 8 files changed, 146 insertions(+), 8 deletions(-) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index f49bc953ec..628a911414 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -46,6 +46,7 @@ public class ExpireSnapshots { private Integer numSnapshots = null; private Integer planningWorkerPoolSize; private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; + private Boolean cleanExpiredMetadata = null; @Override String maintenanceTaskName() { @@ -94,6 +95,18 @@ public class ExpireSnapshots { return this; } + /** + * Expires unused table metadata such as partition specs and schemas. + * + * @param newCleanExpiredMetadata remove unused partition specs, schemas, or other metadata when + * true + * @return this for method chaining + */ + public Builder cleanExpiredMetadata(boolean newCleanExpiredMetadata) { + this.cleanExpiredMetadata = newCleanExpiredMetadata; + return this; + } + @Override DataStream<TaskResult> append(DataStream<Trigger> trigger) { Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); @@ -105,7 +118,8 @@ public class ExpireSnapshots { tableLoader(), maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), numSnapshots, - planningWorkerPoolSize)) + planningWorkerPoolSize, + cleanExpiredMetadata)) .name(operatorName(EXECUTOR_OPERATOR_NAME)) .uid(EXECUTOR_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index 098d32e3b6..154512e27b 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -52,6 +52,7 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul private final Long maxSnapshotAgeMs; private final Integer numSnapshots; private final Integer plannerPoolSize; + private final Boolean cleanExpiredMetadata; private transient ExecutorService plannerPool; private transient Table table; @@ -59,13 +60,15 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul TableLoader tableLoader, Long maxSnapshotAgeMs, Integer numSnapshots, - Integer plannerPoolSize) { - Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Integer plannerPoolSize, + Boolean cleanExpiredMetadata) { + Preconditions.checkNotNull(tableLoader, "Table loader should not be null"); this.tableLoader = tableLoader; this.maxSnapshotAgeMs = maxSnapshotAgeMs; this.numSnapshots = numSnapshots; this.plannerPoolSize = plannerPoolSize; + this.cleanExpiredMetadata = cleanExpiredMetadata; } @Override @@ -92,6 +95,10 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul expireSnapshots = expireSnapshots.retainLast(numSnapshots); } + if (cleanExpiredMetadata != null) { + expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata); + } + AtomicLong deleteFileCounter = new AtomicLong(0L); expireSnapshots .planWith(plannerPool) diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index b1a2b710fa..3f333fffbd 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -165,6 +165,12 @@ public class OperatorTestBase { table.refresh(); } + protected void insert(Table table, Integer id, String data, String extra) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data, extra))); + table.refresh(); + } + /** * For the same identifier column id this methods simulate the following row operations: <tr> * <li>add an equality delete on oldData diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java index d312fc312c..f073272a70 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -31,6 +31,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.maintenance.api.TaskResult; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -46,7 +47,7 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase { Queue<StreamRecord<String>> deletes; try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness = ProcessFunctionTestHarnesses.forProcessFunction( - new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10))) { + new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, false))) { testHarness.open(); if (!success) { @@ -77,4 +78,45 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase { assertThat(deletes).isNull(); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCleanExpiredMetadata(boolean cleanExpiredMetadata) throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + table.updateSchema().addColumn("extra", Types.StringType.get()).commit(); + insert(table, 2, "b", "x"); + + assertThat(table.schemas()).hasSize(2); + + List<TaskResult> actual; + Queue<StreamRecord<String>> deletes; + try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, cleanExpiredMetadata))) { + testHarness.open(); + + testHarness.processElement(Trigger.create(10, 11), System.currentTimeMillis()); + deletes = testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM); + actual = testHarness.extractOutputValues(); + } + + assertThat(actual).hasSize(1); + TaskResult result = actual.get(0); + assertThat(result.startEpoch()).isEqualTo(10); + assertThat(result.taskIndex()).isEqualTo(11); + assertThat(result.success()).isEqualTo(true); + assertThat(result.exceptions()).isNotNull().isEmpty(); + + table.refresh(); + Set<Snapshot> snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(1); + assertThat(deletes).hasSize(1); + + if (cleanExpiredMetadata) { + assertThat(table.schemas().values()).containsExactly(table.schema()); + } else { + assertThat(table.schemas()).hasSize(2); + } + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index f49bc953ec..628a911414 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -46,6 +46,7 @@ public class ExpireSnapshots { private Integer numSnapshots = null; private Integer planningWorkerPoolSize; private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; + private Boolean cleanExpiredMetadata = null; @Override String maintenanceTaskName() { @@ -94,6 +95,18 @@ public class ExpireSnapshots { return this; } + /** + * Expires unused table metadata such as partition specs and schemas. + * + * @param newCleanExpiredMetadata remove unused partition specs, schemas, or other metadata when + * true + * @return this for method chaining + */ + public Builder cleanExpiredMetadata(boolean newCleanExpiredMetadata) { + this.cleanExpiredMetadata = newCleanExpiredMetadata; + return this; + } + @Override DataStream<TaskResult> append(DataStream<Trigger> trigger) { Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); @@ -105,7 +118,8 @@ public class ExpireSnapshots { tableLoader(), maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), numSnapshots, - planningWorkerPoolSize)) + planningWorkerPoolSize, + cleanExpiredMetadata)) .name(operatorName(EXECUTOR_OPERATOR_NAME)) .uid(EXECUTOR_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index 098d32e3b6..154512e27b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -52,6 +52,7 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul private final Long maxSnapshotAgeMs; private final Integer numSnapshots; private final Integer plannerPoolSize; + private final Boolean cleanExpiredMetadata; private transient ExecutorService plannerPool; private transient Table table; @@ -59,13 +60,15 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul TableLoader tableLoader, Long maxSnapshotAgeMs, Integer numSnapshots, - Integer plannerPoolSize) { - Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Integer plannerPoolSize, + Boolean cleanExpiredMetadata) { + Preconditions.checkNotNull(tableLoader, "Table loader should not be null"); this.tableLoader = tableLoader; this.maxSnapshotAgeMs = maxSnapshotAgeMs; this.numSnapshots = numSnapshots; this.plannerPoolSize = plannerPoolSize; + this.cleanExpiredMetadata = cleanExpiredMetadata; } @Override @@ -92,6 +95,10 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul expireSnapshots = expireSnapshots.retainLast(numSnapshots); } + if (cleanExpiredMetadata != null) { + expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata); + } + AtomicLong deleteFileCounter = new AtomicLong(0L); expireSnapshots .planWith(plannerPool) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index b1a2b710fa..3f333fffbd 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -165,6 +165,12 @@ public class OperatorTestBase { table.refresh(); } + protected void insert(Table table, Integer id, String data, String extra) throws IOException { + new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir) + .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, data, extra))); + table.refresh(); + } + /** * For the same identifier column id this methods simulate the following row operations: <tr> * <li>add an equality delete on oldData diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java index d312fc312c..f073272a70 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -31,6 +31,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.maintenance.api.TaskResult; import org.apache.iceberg.flink.maintenance.api.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -46,7 +47,7 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase { Queue<StreamRecord<String>> deletes; try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness = ProcessFunctionTestHarnesses.forProcessFunction( - new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10))) { + new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, false))) { testHarness.open(); if (!success) { @@ -77,4 +78,45 @@ class TestExpireSnapshotsProcessor extends OperatorTestBase { assertThat(deletes).isNull(); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCleanExpiredMetadata(boolean cleanExpiredMetadata) throws Exception { + Table table = createTable(); + insert(table, 1, "a"); + table.updateSchema().addColumn("extra", Types.StringType.get()).commit(); + insert(table, 2, "b", "x"); + + assertThat(table.schemas()).hasSize(2); + + List<TaskResult> actual; + Queue<StreamRecord<String>> deletes; + try (OneInputStreamOperatorTestHarness<Trigger, TaskResult> testHarness = + ProcessFunctionTestHarnesses.forProcessFunction( + new ExpireSnapshotsProcessor(tableLoader(), 0L, 1, 10, cleanExpiredMetadata))) { + testHarness.open(); + + testHarness.processElement(Trigger.create(10, 11), System.currentTimeMillis()); + deletes = testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM); + actual = testHarness.extractOutputValues(); + } + + assertThat(actual).hasSize(1); + TaskResult result = actual.get(0); + assertThat(result.startEpoch()).isEqualTo(10); + assertThat(result.taskIndex()).isEqualTo(11); + assertThat(result.success()).isEqualTo(true); + assertThat(result.exceptions()).isNotNull().isEmpty(); + + table.refresh(); + Set<Snapshot> snapshots = Sets.newHashSet(table.snapshots()); + assertThat(snapshots).hasSize(1); + assertThat(deletes).hasSize(1); + + if (cleanExpiredMetadata) { + assertThat(table.schemas().values()).containsExactly(table.schema()); + } else { + assertThat(table.schemas()).hasSize(2); + } + } }