This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 652691c2ab Flink: Backport: Allow arbitrary post-commit maintenance
tasks via IcebergSink Builder (#15566) (#15667)
652691c2ab is described below
commit 652691c2ab06176f16f1db418d87d70cd94b176a
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Mar 17 18:31:34 2026 +0100
Flink: Backport: Allow arbitrary post-commit maintenance tasks via
IcebergSink Builder (#15566) (#15667)
---
.../org/apache/iceberg/flink/FlinkWriteConf.java | 18 ++
.../apache/iceberg/flink/FlinkWriteOptions.java | 16 +-
.../flink/maintenance/api/DeleteOrphanFiles.java | 26 ++-
.../maintenance/api/DeleteOrphanFilesConfig.java | 216 +++++++++++++++++++++
.../flink/maintenance/api/ExpireSnapshots.java | 14 ++
.../maintenance/api/ExpireSnapshotsConfig.java | 151 ++++++++++++++
.../maintenance/api/FlinkMaintenanceConfig.java | 8 +
.../flink/maintenance/api/TableMaintenance.java | 12 ++
.../iceberg/flink/sink/IcebergCommitter.java | 8 +-
.../org/apache/iceberg/flink/sink/IcebergSink.java | 147 +++++++++++---
.../api/TestDeleteOrphanFilesConfig.java | 91 +++++++++
.../maintenance/api/TestExpireSnapshotsConfig.java | 84 ++++++++
...t.java => TestIcebergSinkTableMaintenance.java} | 167 +++++++++++++++-
.../org/apache/iceberg/flink/FlinkWriteConf.java | 18 ++
.../apache/iceberg/flink/FlinkWriteOptions.java | 16 +-
.../flink/maintenance/api/DeleteOrphanFiles.java | 26 ++-
.../maintenance/api/DeleteOrphanFilesConfig.java | 216 +++++++++++++++++++++
.../flink/maintenance/api/ExpireSnapshots.java | 14 ++
.../maintenance/api/ExpireSnapshotsConfig.java | 151 ++++++++++++++
.../maintenance/api/FlinkMaintenanceConfig.java | 8 +
.../flink/maintenance/api/TableMaintenance.java | 12 ++
.../iceberg/flink/sink/IcebergCommitter.java | 8 +-
.../org/apache/iceberg/flink/sink/IcebergSink.java | 147 +++++++++++---
.../api/TestDeleteOrphanFilesConfig.java | 91 +++++++++
.../maintenance/api/TestExpireSnapshotsConfig.java | 84 ++++++++
...t.java => TestIcebergSinkTableMaintenance.java} | 167 +++++++++++++++-
26 files changed, 1842 insertions(+), 74 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 66fd098077..990d23f2aa 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -220,6 +220,24 @@ public class FlinkWriteConf {
return
confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}
+ public boolean expireSnapshotsMode() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key())
+ .flinkConfig(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE)
+ .defaultValue(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.defaultValue())
+ .parse();
+ }
+
+ public boolean deleteOrphanFilesMode() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key())
+ .flinkConfig(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE)
+
.defaultValue(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.defaultValue())
+ .parse();
+ }
+
public boolean compactMode() {
return confParser
.booleanConf()
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index e68e64ac57..ee2aeaa450 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
+import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/** Flink sink write options */
@@ -82,7 +85,18 @@ public class FlinkWriteOptions {
ConfigOptions.key("write-parallelism").intType().noDefaultValue();
public static final ConfigOption<Boolean> COMPACTION_ENABLE =
-
ConfigOptions.key("compaction-enabled").booleanType().defaultValue(false);
+ ConfigOptions.key(RewriteDataFilesConfig.PREFIX + "enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDeprecatedKeys("compaction-enabled");
+
+ public static final ConfigOption<Boolean> EXPIRE_SNAPSHOTS_ENABLE =
+ ConfigOptions.key(ExpireSnapshotsConfig.PREFIX +
"enabled").booleanType().defaultValue(false);
+
+ public static final ConfigOption<Boolean> DELETE_ORPHAN_FILES_ENABLE =
+ ConfigOptions.key(DeleteOrphanFilesConfig.PREFIX + "enabled")
+ .booleanType()
+ .defaultValue(false);
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 2fce5e0b3e..63a267d16e 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -53,6 +53,11 @@ public class DeleteOrphanFiles {
ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+ static final Map<String, String> DEFAULT_EQUAL_SCHEMES =
+ ImmutableMap.of(
+ "s3n", "s3",
+ "s3a", "s3");
+
@Internal
public static final OutputTag<Exception> ERROR_STREAM =
new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
@@ -79,12 +84,8 @@ public class DeleteOrphanFiles {
private Duration minAge = Duration.ofDays(3);
private int planningWorkerPoolSize = ThreadPools.WORKER_THREAD_POOL_SIZE;
private int deleteBatchSize = 1000;
- private boolean usePrefixListing = false;
- private Map<String, String> equalSchemes =
- Maps.newHashMap(
- ImmutableMap.of(
- "s3n", "s3",
- "s3a", "s3"));
+ private boolean usePrefixListing = true;
+ private Map<String, String> equalSchemes =
Maps.newHashMap(DEFAULT_EQUAL_SCHEMES);
private final Map<String, String> equalAuthorities = Maps.newHashMap();
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
@@ -189,6 +190,19 @@ public class DeleteOrphanFiles {
return this;
}
+ public Builder config(DeleteOrphanFilesConfig deleteOrphanFilesConfig) {
+ return this.scheduleOnInterval(
+
Duration.ofSeconds(deleteOrphanFilesConfig.scheduleOnIntervalSecond()))
+ .minAge(Duration.ofSeconds(deleteOrphanFilesConfig.minAgeSeconds()))
+ .deleteBatchSize(deleteOrphanFilesConfig.deleteBatchSize())
+ .usePrefixListing(deleteOrphanFilesConfig.usePrefixListing())
+ .prefixMismatchMode(deleteOrphanFilesConfig.prefixMismatchMode())
+ .location(deleteOrphanFilesConfig.location())
+
.planningWorkerPoolSize(deleteOrphanFilesConfig.planningWorkerPoolSize())
+ .equalSchemes(deleteOrphanFilesConfig.equalSchemes())
+ .equalAuthorities(deleteOrphanFilesConfig.equalAuthorities());
+ }
+
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
tableLoader().open();
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..af34735781
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+
+public class DeleteOrphanFilesConfig {
+ public static final String PREFIX = FlinkMaintenanceConfig.PREFIX +
"delete-orphan-files.";
+
+ private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+ private static final Splitter EQUALS_SPLITTER = Splitter.on("=").limit(2);
+
+ public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX +
"schedule.interval-second";
+ public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+ .longType()
+ .defaultValue(60 * 60L) // Default is 1 hour
+ .withDescription(
+ "The time interval (in seconds) between two consecutive delete
orphan files operations.");
+
+ public static final String MIN_AGE_SECONDS = PREFIX + "min-age-seconds";
+ public static final ConfigOption<Long> MIN_AGE_SECONDS_OPTION =
+ ConfigOptions.key(MIN_AGE_SECONDS)
+ .longType()
+ .defaultValue(3L * 24 * 60 * 60) // Default is 3 days
+ .withDescription(
+ "The minimum age (in seconds) of files to be considered for
deletion. "
+ + "Files newer than this will not be removed.");
+
+ public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+ public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+ ConfigOptions.key(DELETE_BATCH_SIZE)
+ .intType()
+ .defaultValue(1000)
+ .withDescription("The batch size used for deleting orphan files.");
+
+ public static final String LOCATION = PREFIX + "location";
+ public static final ConfigOption<String> LOCATION_OPTION =
+ ConfigOptions.key(LOCATION)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The location to start recursive listing of candidate files for
removal. "
+ + "By default, the table location is used.");
+
+ public static final String USE_PREFIX_LISTING = PREFIX +
"use-prefix-listing";
+ public static final ConfigOption<Boolean> USE_PREFIX_LISTING_OPTION =
+ ConfigOptions.key(USE_PREFIX_LISTING)
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use prefix listing when listing files from the file
system.");
+
+ public static final String PLANNING_WORKER_POOL_SIZE = PREFIX +
"planning-worker-pool-size";
+ public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+ ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The worker pool size used for planning the scan of the
ALL_FILES table. "
+ + "If not set, the shared worker pool is used.");
+
+ public static final String EQUAL_SCHEMES = PREFIX + "equal-schemes";
+ public static final ConfigOption<String> EQUAL_SCHEMES_OPTION =
+ ConfigOptions.key(EQUAL_SCHEMES)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Schemes that should be considered equal, in the format
'scheme1=scheme2,scheme3=scheme4'.");
+
+ public static final String EQUAL_AUTHORITIES = PREFIX + "equal-authorities";
+ public static final ConfigOption<String> EQUAL_AUTHORITIES_OPTION =
+ ConfigOptions.key(EQUAL_AUTHORITIES)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Authorities that should be considered equal, in the format
'auth1=auth2,auth3=auth4'.");
+
+ public static final String PREFIX_MISMATCH_MODE = PREFIX +
"prefix-mismatch-mode";
+ public static final ConfigOption<String> PREFIX_MISMATCH_MODE_OPTION =
+ ConfigOptions.key(PREFIX_MISMATCH_MODE)
+ .stringType()
+ .defaultValue(PrefixMismatchMode.ERROR.name())
+ .withDescription(
+ "Action behavior when location prefixes (schemes/authorities)
mismatch. "
+ + "Valid values: ERROR, IGNORE, DELETE.");
+
+ private final FlinkConfParser confParser;
+
+ public DeleteOrphanFilesConfig(
+ Table table, Map<String, String> writeOptions, ReadableConfig
readableConfig) {
+ this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+ }
+
+ public long scheduleOnIntervalSecond() {
+ return confParser
+ .longConf()
+ .option(SCHEDULE_ON_INTERVAL_SECOND)
+ .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+ .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+ .parse();
+ }
+
+ public long minAgeSeconds() {
+ return confParser
+ .longConf()
+ .option(MIN_AGE_SECONDS)
+ .flinkConfig(MIN_AGE_SECONDS_OPTION)
+ .defaultValue(MIN_AGE_SECONDS_OPTION.defaultValue())
+ .parse();
+ }
+
+ public int deleteBatchSize() {
+ return confParser
+ .intConf()
+ .option(DELETE_BATCH_SIZE)
+ .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+ .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+ .parse();
+ }
+
+ public String location() {
+ return
confParser.stringConf().option(LOCATION).flinkConfig(LOCATION_OPTION).parseOptional();
+ }
+
+ public boolean usePrefixListing() {
+ return confParser
+ .booleanConf()
+ .option(USE_PREFIX_LISTING)
+ .flinkConfig(USE_PREFIX_LISTING_OPTION)
+ .defaultValue(USE_PREFIX_LISTING_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Integer planningWorkerPoolSize() {
+ return confParser
+ .intConf()
+ .option(PLANNING_WORKER_POOL_SIZE)
+ .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+ .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .parse();
+ }
+
+ public Map<String, String> equalSchemes() {
+ String equalSchemes =
+ confParser
+ .stringConf()
+ .option(EQUAL_SCHEMES)
+ .flinkConfig(EQUAL_SCHEMES_OPTION)
+ .parseOptional();
+
+ return equalSchemes != null
+ ? parseKeyValuePairs(equalSchemes)
+ : Maps.newHashMap(DeleteOrphanFiles.DEFAULT_EQUAL_SCHEMES);
+ }
+
+ public Map<String, String> equalAuthorities() {
+ String equalAuthorities =
+ confParser
+ .stringConf()
+ .option(EQUAL_AUTHORITIES)
+ .flinkConfig(EQUAL_AUTHORITIES_OPTION)
+ .parseOptional();
+
+ return equalAuthorities != null ? parseKeyValuePairs(equalAuthorities) :
Map.of();
+ }
+
+ public PrefixMismatchMode prefixMismatchMode() {
+ String value =
+ confParser
+ .stringConf()
+ .option(PREFIX_MISMATCH_MODE)
+ .flinkConfig(PREFIX_MISMATCH_MODE_OPTION)
+ .defaultValue(PREFIX_MISMATCH_MODE_OPTION.defaultValue())
+ .parse();
+ return PrefixMismatchMode.valueOf(value);
+ }
+
+ private static Map<String, String> parseKeyValuePairs(String value) {
+ Map<String, String> result = Maps.newHashMap();
+ for (String pair : COMMA_SPLITTER.split(value)) {
+ List<String> parts = EQUALS_SPLITTER.splitToList(pair);
+ Preconditions.checkArgument(parts.size() == 2, "Invalid key-value pair:
%s", pair);
+ result.put(parts.get(0).trim(), parts.get(1).trim());
+ }
+
+ return result;
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index 628a911414..c84932f96f 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.maintenance.api;
import java.time.Duration;
+import java.util.Optional;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -107,6 +108,19 @@ public class ExpireSnapshots {
return this;
}
+ public Builder config(ExpireSnapshotsConfig expireSnapshotsConfig) {
+ return
this.scheduleOnCommitCount(expireSnapshotsConfig.scheduleOnCommitCount())
+
.scheduleOnInterval(Duration.ofSeconds(expireSnapshotsConfig.scheduleOnIntervalSecond()))
+ .deleteBatchSize(expireSnapshotsConfig.deleteBatchSize())
+ .maxSnapshotAge(
+
Optional.ofNullable(expireSnapshotsConfig.maxSnapshotAgeSeconds())
+ .map(Duration::ofSeconds)
+ .orElse(null))
+ .retainLast(expireSnapshotsConfig.retainLast())
+ .cleanExpiredMetadata(expireSnapshotsConfig.cleanExpiredMetadata())
+
.planningWorkerPoolSize(expireSnapshotsConfig.planningWorkerPoolSize());
+ }
+
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be
null");
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..13436975e1
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.util.ThreadPools;
+
+public class ExpireSnapshotsConfig {
+ public static final String PREFIX = FlinkMaintenanceConfig.PREFIX +
"expire-snapshots.";
+
+ public static final String SCHEDULE_ON_COMMIT_COUNT = PREFIX +
"schedule.commit-count";
+ public static final ConfigOption<Integer> SCHEDULE_ON_COMMIT_COUNT_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_COMMIT_COUNT)
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The number of commits after which to trigger a new expire
snapshots operation.");
+
+ public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX +
"schedule.interval-second";
+ public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+ .longType()
+ .defaultValue(60 * 60L) // Default is 1 hour
+ .withDescription(
+ "The time interval (in seconds) between two consecutive expire
snapshots operations.");
+
+ public static final String MAX_SNAPSHOT_AGE_SECONDS = PREFIX +
"max-snapshot-age-seconds";
+ public static final ConfigOption<Long> MAX_SNAPSHOT_AGE_SECONDS_OPTION =
+ ConfigOptions.key(MAX_SNAPSHOT_AGE_SECONDS)
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The maximum age (in seconds) of snapshots to retain. "
+ + "Snapshots older than this will be expired.");
+
+ public static final String RETAIN_LAST = PREFIX + "retain-last";
+ public static final ConfigOption<Integer> RETAIN_LAST_OPTION =
+ ConfigOptions.key(RETAIN_LAST)
+ .intType()
+ .noDefaultValue()
+ .withDescription("The minimum number of snapshots to retain.");
+
+ public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+ public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+ ConfigOptions.key(DELETE_BATCH_SIZE)
+ .intType()
+ .defaultValue(1000)
+ .withDescription("The batch size used for deleting expired files.");
+
+ public static final String CLEAN_EXPIRED_METADATA = PREFIX +
"clean-expired-metadata";
+ public static final ConfigOption<Boolean> CLEAN_EXPIRED_METADATA_OPTION =
+ ConfigOptions.key(CLEAN_EXPIRED_METADATA)
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to clean expired metadata such as partition specs and
schemas.");
+
+ public static final String PLANNING_WORKER_POOL_SIZE = PREFIX +
"planning-worker-pool-size";
+ public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+ ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The worker pool size used to calculate the files to delete. "
+ + "If not set, the shared worker pool is used.");
+
+ private final FlinkConfParser confParser;
+
+ public ExpireSnapshotsConfig(
+ Table table, Map<String, String> writeOptions, ReadableConfig
readableConfig) {
+ this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+ }
+
+ public int scheduleOnCommitCount() {
+ return confParser
+ .intConf()
+ .option(SCHEDULE_ON_COMMIT_COUNT)
+ .flinkConfig(SCHEDULE_ON_COMMIT_COUNT_OPTION)
+ .defaultValue(SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue())
+ .parse();
+ }
+
+ public long scheduleOnIntervalSecond() {
+ return confParser
+ .longConf()
+ .option(SCHEDULE_ON_INTERVAL_SECOND)
+ .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+ .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Long maxSnapshotAgeSeconds() {
+ return confParser
+ .longConf()
+ .option(MAX_SNAPSHOT_AGE_SECONDS)
+ .flinkConfig(MAX_SNAPSHOT_AGE_SECONDS_OPTION)
+ .parseOptional();
+ }
+
+ public Integer retainLast() {
+ return
confParser.intConf().option(RETAIN_LAST).flinkConfig(RETAIN_LAST_OPTION).parseOptional();
+ }
+
+ public int deleteBatchSize() {
+ return confParser
+ .intConf()
+ .option(DELETE_BATCH_SIZE)
+ .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+ .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Boolean cleanExpiredMetadata() {
+ return confParser
+ .booleanConf()
+ .option(CLEAN_EXPIRED_METADATA)
+ .flinkConfig(CLEAN_EXPIRED_METADATA_OPTION)
+ .defaultValue(CLEAN_EXPIRED_METADATA_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Integer planningWorkerPoolSize() {
+ return confParser
+ .intConf()
+ .option(PLANNING_WORKER_POOL_SIZE)
+ .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+ .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .parse();
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index 0c88abf820..e6f536273e 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -122,6 +122,14 @@ public class FlinkMaintenanceConfig {
return new RewriteDataFilesConfig(table, writeProperties, readableConfig);
}
+ public ExpireSnapshotsConfig createExpireSnapshotsConfig() {
+ return new ExpireSnapshotsConfig(table, writeProperties, readableConfig);
+ }
+
+ public DeleteOrphanFilesConfig createDeleteOrphanFilesConfig() {
+ return new DeleteOrphanFilesConfig(table, writeProperties, readableConfig);
+ }
+
public LockConfig createLockConfig() {
return new LockConfig(table, writeProperties, readableConfig);
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index ab5159be12..d98a1c9ab4 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.maintenance.api;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@@ -252,6 +253,17 @@ public class TableMaintenance {
return this;
}
+ /**
+ * Adds multiple tasks with the given schedules.
+ *
+ * @param tasks to add
+ */
+ public Builder add(Collection<MaintenanceTaskBuilder<?>> tasks) {
+ Preconditions.checkNotNull(tasks, "Tasks collection should not be null");
+ taskBuilders.addAll(tasks);
+ return this;
+ }
+
/** Builds the task graph for the maintenance tasks. */
public void append() throws IOException {
Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least
one task");
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
index c05e7d9180..8e45a2db30 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
@@ -78,7 +78,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
private int maxContinuousEmptyCommits;
private ExecutorService workerPool;
private int continuousEmptyCheckpoints = 0;
- private boolean compactMode = false;
+ private final boolean tableMaintenanceEnabled;
IcebergCommitter(
TableLoader tableLoader,
@@ -88,7 +88,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
int workerPoolSize,
String sinkId,
IcebergFilesCommitterMetrics committerMetrics,
- boolean compactMode) {
+ boolean tableMaintenanceEnabled) {
this.branch = branch;
this.snapshotProperties = snapshotProperties;
this.replacePartitions = replacePartitions;
@@ -107,7 +107,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
ThreadPools.newFixedThreadPool(
"iceberg-committer-pool-" + table.name() + "-" + sinkId,
workerPoolSize);
this.continuousEmptyCheckpoints = 0;
- this.compactMode = compactMode;
+ this.tableMaintenanceEnabled = tableMaintenanceEnabled;
}
@Override
@@ -177,7 +177,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
committerMetrics.updateCommitSummary(summary);
}
- if (!compactMode) {
+ if (!tableMaintenanceEnabled) {
FlinkManifestUtil.deleteCommittedManifests(table, manifests,
newFlinkJobId, checkpointId);
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index dd20853f05..0d3e4a34d9 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -72,8 +72,13 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshots;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
import org.apache.iceberg.flink.maintenance.api.FlinkMaintenanceConfig;
import org.apache.iceberg.flink.maintenance.api.LockConfig;
+import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFiles;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.maintenance.api.TableMaintenance;
@@ -87,6 +92,7 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -158,14 +164,15 @@ public class IcebergSink
private final String branch;
private final boolean overwriteMode;
private final int workerPoolSize;
- private final boolean compactMode;
- private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
-
+ private final boolean maintenanceEnabled;
private final Table table;
// This should only be used for logging/error messages. For any actual logic
always use
// equalityFieldIds instead.
private final Set<String> equalityFieldColumns;
+ private final transient List<MaintenanceTaskBuilder<?>> maintenanceTasks;
+ private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
+
private IcebergSink(
TableLoader tableLoader,
Table table,
@@ -178,6 +185,7 @@ public class IcebergSink
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode,
+ List<MaintenanceTaskBuilder<?>> maintenanceTasks,
FlinkMaintenanceConfig flinkMaintenanceConfig,
Set<String> equalityFieldColumns) {
this.tableLoader = tableLoader;
@@ -199,7 +207,8 @@ public class IcebergSink
// This is used to separate files generated by different sinks writing the
same table.
// Also used to generate the aggregator operator name
this.sinkId = UUID.randomUUID().toString();
- this.compactMode = flinkWriteConf.compactMode();
+ this.maintenanceEnabled = !maintenanceTasks.isEmpty();
+ this.maintenanceTasks = maintenanceTasks;
this.flinkMaintenanceConfig = flinkMaintenanceConfig;
this.equalityFieldColumns = equalityFieldColumns;
}
@@ -238,7 +247,7 @@ public class IcebergSink
workerPoolSize,
sinkId,
metrics,
- compactMode);
+ maintenanceEnabled);
}
@Override
@@ -250,7 +259,7 @@ public class IcebergSink
public void addPostCommitTopology(
DataStream<CommittableMessage<IcebergCommittable>> committables) {
- if (!compactMode) {
+ if (maintenanceTasks.isEmpty()) {
return;
}
@@ -264,26 +273,21 @@ public class IcebergSink
.uid(postCommitUid)
.forceNonParallel();
try {
- RewriteDataFilesConfig rewriteDataFilesConfig =
- flinkMaintenanceConfig.createRewriteDataFilesConfig();
- RewriteDataFiles.Builder rewriteBuilder =
- RewriteDataFiles.builder().config(rewriteDataFilesConfig);
-
LockConfig lockConfig = flinkMaintenanceConfig.createLockConfig();
String tableMaintenanceUid = String.format("TableMaintenance : %s",
suffix);
- TableMaintenance.Builder builder =
- StringUtils.isNotEmpty(lockConfig.lockType())
- ? TableMaintenance.forChangeStream(
- tableChangeStream,
- tableLoader,
- LockFactoryBuilder.build(lockConfig, table.name()))
- .uidSuffix(tableMaintenanceUid)
- .add(rewriteBuilder)
- : TableMaintenance.forChangeStream(tableChangeStream,
tableLoader)
- .uidSuffix(tableMaintenanceUid)
- .add(rewriteBuilder);
+
+ TableMaintenance.Builder builder;
+ if (StringUtils.isNotEmpty(lockConfig.lockType())) {
+ builder =
+ TableMaintenance.forChangeStream(
+ tableChangeStream, tableLoader,
LockFactoryBuilder.build(lockConfig, table.name()));
+ } else {
+ builder = TableMaintenance.forChangeStream(tableChangeStream,
tableLoader);
+ }
builder
+ .uidSuffix(tableMaintenanceUid)
+ .add(maintenanceTasks)
.rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
.slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
@@ -344,6 +348,7 @@ public class IcebergSink
private final Map<String, String> snapshotSummary = Maps.newHashMap();
private ReadableConfig readableConfig = new Configuration();
private List<String> equalityFieldColumns = null;
+ private final List<MaintenanceTaskBuilder<?>> maintenanceTasks =
Lists.newArrayList();
private Builder() {}
@@ -626,6 +631,85 @@ public class IcebergSink
return this;
}
+ /**
+ * Enables or disables compaction (rewriting data files) as a post-commit
maintenance task.
+ *
+ * @param enabled whether to enable compaction
+ * @see RewriteDataFilesConfig for the default config.
+ * @deprecated See {@code rewriteDatafiles(..)}
+ */
+ @Deprecated
+ public Builder compaction(boolean enabled) {
+ writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(),
Boolean.toString(enabled));
+ return this;
+ }
+
+ /**
+ * Enables rewriting data files (compaction) as a post-commit maintenance
task.
+ *
+ * @see RewriteDataFilesConfig for the default config.
+ */
+ public Builder rewriteDataFiles() {
+ writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+ return this;
+ }
+
+ /**
+ * Enables rewriting data files (compaction) as a post-commit maintenance
task.
+ *
+ * @param config task-specific configuration, see {@link
RewriteDataFilesConfig} for available
+ * keys
+ */
+ public Builder rewriteDataFiles(Map<String, String> config) {
+ rewriteDataFiles();
+ writeOptions.putAll(config);
+ return this;
+ }
+
+ /**
+ * Enables expire snapshots as a post-commit maintenance task.
+ *
+ * @see ExpireSnapshotsConfig for the default config.
+ */
+ public Builder expireSnapshots() {
+ writeOptions.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(),
"true");
+ return this;
+ }
+
+ /**
+ * Enables or disables expire snapshots as a post-commit maintenance task.
+ *
+ * @param config task-specific configuration, see {@link
ExpireSnapshotsConfig} for available
+ * keys
+ */
+ public Builder expireSnapshots(Map<String, String> config) {
+ expireSnapshots();
+ writeOptions.putAll(config);
+ return this;
+ }
+
+ /**
+ * Enables delete orphan files as a post-commit maintenance task.
+ *
+ * @see DeleteOrphanFilesConfig for the default config.
+ */
+ public Builder deleteOrphanFiles() {
+ writeOptions.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(),
"true");
+ return this;
+ }
+
+ /**
+ * Enables delete orphan files as a post-commit maintenance task.
+ *
+ * @param config task-specific configuration, see {@link
DeleteOrphanFilesConfig} for available
+ * keys.
+ */
+ public Builder deleteOrphanFiles(Map<String, String> config) {
+ deleteOrphanFiles();
+ writeOptions.putAll(config);
+ return this;
+ }
+
@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
@@ -682,6 +766,24 @@ public class IcebergSink
FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
+ if (flinkWriteConf.compactMode()) {
+ RewriteDataFilesConfig rewriteDataFilesConfig =
+ flinkMaintenanceConfig.createRewriteDataFilesConfig();
+
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+ }
+
+ if (flinkWriteConf.expireSnapshotsMode()) {
+ ExpireSnapshotsConfig expireSnapshotsConfig =
+ flinkMaintenanceConfig.createExpireSnapshotsConfig();
+
maintenanceTasks.add(ExpireSnapshots.builder().config(expireSnapshotsConfig));
+ }
+
+ if (flinkWriteConf.deleteOrphanFilesMode()) {
+ DeleteOrphanFilesConfig deleteOrphanFilesConfig =
+ flinkMaintenanceConfig.createDeleteOrphanFilesConfig();
+
maintenanceTasks.add(DeleteOrphanFiles.builder().config(deleteOrphanFilesConfig));
+ }
+
Set<String> equalityFieldColumnsSet =
equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns)
: null;
@@ -699,6 +801,7 @@ public class IcebergSink
equalityFieldIds,
flinkWriteConf.branch(),
overwriteMode,
+ maintenanceTasks,
flinkMaintenanceConfig,
equalityFieldColumnsSet);
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..f26f608ffa
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestDeleteOrphanFilesConfig extends OperatorTestBase {
+ private Table table;
+ private Map<String, String> input = Maps.newHashMap();
+
+ @BeforeEach
+ public void before() {
+ this.table = createTable();
+ input.put(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+ input.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+ input.put(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE, "500");
+ input.put(DeleteOrphanFilesConfig.LOCATION, "/tmp/test-location");
+ input.put(DeleteOrphanFilesConfig.USE_PREFIX_LISTING, "true");
+ input.put(DeleteOrphanFilesConfig.PLANNING_WORKER_POOL_SIZE, "4");
+ input.put(DeleteOrphanFilesConfig.EQUAL_SCHEMES, "s3n=s3,s3a=s3");
+ input.put(DeleteOrphanFilesConfig.EQUAL_AUTHORITIES, "auth1=auth2");
+ input.put(DeleteOrphanFilesConfig.PREFIX_MISMATCH_MODE, "IGNORE");
+ input.put("other.config", "should-be-ignored");
+ }
+
+ @AfterEach
+ public void after() {
+ input.clear();
+ }
+
+ @Test
+ void testConfigParsing() {
+ DeleteOrphanFilesConfig config = new DeleteOrphanFilesConfig(table, input,
new Configuration());
+
+ assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+ assertThat(config.minAgeSeconds()).isEqualTo(86400L);
+ assertThat(config.deleteBatchSize()).isEqualTo(500);
+ assertThat(config.location()).isEqualTo("/tmp/test-location");
+ assertThat(config.usePrefixListing()).isTrue();
+ assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+ assertThat(config.equalSchemes()).containsEntry("s3n",
"s3").containsEntry("s3a", "s3");
+ assertThat(config.equalAuthorities()).containsEntry("auth1", "auth2");
+
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.IGNORE);
+ }
+
+ @Test
+ void testConfigDefaults() {
+ DeleteOrphanFilesConfig config =
+ new DeleteOrphanFilesConfig(table, Maps.newHashMap(), new
Configuration());
+
+ assertThat(config.scheduleOnIntervalSecond())
+
.isEqualTo(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+ assertThat(config.minAgeSeconds())
+
.isEqualTo(DeleteOrphanFilesConfig.MIN_AGE_SECONDS_OPTION.defaultValue());
+ assertThat(config.deleteBatchSize())
+
.isEqualTo(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+ assertThat(config.location()).isNull();
+ assertThat(config.usePrefixListing()).isTrue();
+
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+ assertThat(config.equalSchemes()).containsEntry("s3n",
"s3").containsEntry("s3a", "s3");
+ assertThat(config.equalAuthorities()).isEqualTo(Map.of());
+
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.ERROR);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..3bcec8114b
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExpireSnapshotsConfig extends OperatorTestBase {
+ private Table table;
+ private Map<String, String> input = Maps.newHashMap();
+
+ @BeforeEach
+ public void before() {
+ this.table = createTable();
+ input.put(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT, "10");
+ input.put(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+ input.put(ExpireSnapshotsConfig.MAX_SNAPSHOT_AGE_SECONDS, "7200");
+ input.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+ input.put(ExpireSnapshotsConfig.DELETE_BATCH_SIZE, "500");
+ input.put(ExpireSnapshotsConfig.CLEAN_EXPIRED_METADATA, "true");
+ input.put(ExpireSnapshotsConfig.PLANNING_WORKER_POOL_SIZE, "4");
+ input.put("other.config", "should-be-ignored");
+ }
+
+ @AfterEach
+ public void after() {
+ input.clear();
+ }
+
+ @Test
+ void testConfigParsing() {
+ ExpireSnapshotsConfig config = new ExpireSnapshotsConfig(table, input, new
Configuration());
+
+ assertThat(config.scheduleOnCommitCount()).isEqualTo(10);
+ assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+ assertThat(config.maxSnapshotAgeSeconds()).isEqualTo(7200L);
+ assertThat(config.retainLast()).isEqualTo(5);
+ assertThat(config.deleteBatchSize()).isEqualTo(500);
+ assertThat(config.cleanExpiredMetadata()).isTrue();
+ assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+ }
+
+ @Test
+ void testConfigDefaults() {
+ ExpireSnapshotsConfig config =
+ new ExpireSnapshotsConfig(table, Maps.newHashMap(), new
Configuration());
+
+ assertThat(config.scheduleOnCommitCount())
+
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue());
+ assertThat(config.scheduleOnIntervalSecond())
+
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+ assertThat(config.maxSnapshotAgeSeconds()).isNull();
+ assertThat(config.retainLast()).isNull();
+ assertThat(config.deleteBatchSize())
+
.isEqualTo(ExpireSnapshotsConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+ assertThat(config.cleanExpiredMetadata()).isTrue();
+
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
similarity index 53%
rename from
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
rename to
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
index 8042fe6e0f..5c926d7c25 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
import org.apache.iceberg.flink.maintenance.api.LockConfig;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -51,7 +53,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
-class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase {
+class TestIcebergSinkTableMaintenance extends TestFlinkIcebergSinkBase {
private static final String[] LOCK_TYPES = new String[]
{LockConfig.JdbcLockConfig.JDBC, ""};
private Map<String, String> flinkConf;
@@ -59,9 +61,6 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase
{
@BeforeEach
void before() throws IOException {
this.flinkConf = Maps.newHashMap();
- flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
- flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
- flinkConf.put(RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
table =
CATALOG_EXTENSION
@@ -84,6 +83,7 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase
{
@FieldSource("LOCK_TYPES")
public void testCompactFileE2e(String lockType) throws Exception {
setupLockConfig(lockType);
+ setupCompactionConfig();
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -124,6 +124,7 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
@FieldSource("LOCK_TYPES")
public void testTableMaintenanceOperatorAdded(String lockType) {
setupLockConfig(lockType);
+ setupCompactionConfig();
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -147,6 +148,22 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
assertThat(containRewrite).isTrue();
}
+ private void setupCompactionConfig() {
+ flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+ flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
+ flinkConf.put(RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+ }
+
+ private void setupExpireSnapshotsConfig() {
+ flinkConf.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), "true");
+ flinkConf.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+ }
+
+ private void setupDeleteOrphanFilesConfig() {
+ flinkConf.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), "true");
+ flinkConf.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+ }
+
private void setupLockConfig(String lockType) {
if (lockType.equals(LockConfig.JdbcLockConfig.JDBC)) {
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(),
LockConfig.JdbcLockConfig.JDBC);
@@ -159,4 +176,146 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), "");
}
}
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testExpireSnapshotsEnabled(String lockType) {
+ setupLockConfig(lockType);
+ setupExpireSnapshotsConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containExpire = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Expire")) {
+ containExpire = true;
+ break;
+ }
+ }
+
+ assertThat(containExpire).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testDeleteOrphanFilesEnabled(String lockType) {
+ setupLockConfig(lockType);
+ setupDeleteOrphanFilesConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containOrphan = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Orphan")) {
+ containOrphan = true;
+ break;
+ }
+ }
+
+ assertThat(containOrphan).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testAllMaintenanceTasksCombined(String lockType) {
+ setupLockConfig(lockType);
+ setupCompactionConfig();
+ setupExpireSnapshotsConfig();
+ setupDeleteOrphanFilesConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containRewrite = false;
+ boolean containExpire = false;
+ boolean containOrphan = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Rewrite")) {
+ containRewrite = true;
+ }
+
+ if (vertex.getName().contains("Expire")) {
+ containExpire = true;
+ }
+
+ if (vertex.getName().contains("Orphan")) {
+ containOrphan = true;
+ }
+ }
+
+ assertThat(containRewrite).isTrue();
+ assertThat(containExpire).isTrue();
+ assertThat(containOrphan).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testAllMaintenanceE2e(String lockType) throws Exception {
+ setupLockConfig(lockType);
+
+ Map<String, String> compactionConfig = Maps.newHashMap();
+ compactionConfig.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE,
"1");
+ compactionConfig.put(
+ RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+
+ Map<String, String> expireConfig = Maps.newHashMap();
+ expireConfig.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+
+ Map<String, String> orphanConfig = Maps.newHashMap();
+ orphanConfig.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .setAll(flinkConf)
+ .table(table)
+ .tableLoader(tableLoader)
+ .rewriteDataFiles(compactionConfig)
+ .expireSnapshots(expireConfig)
+ .deleteOrphanFiles(orphanConfig)
+ .append();
+
+ env.execute("Test All Maintenance E2E");
+
+ table.refresh();
+ // Compaction should have merged the 3 data files into 1
+ List<DataFile> afterCompactDataFiles =
getDataFiles(table.currentSnapshot(), table);
+ assertThat(afterCompactDataFiles).hasSize(1);
+
+ List<DataFile> preCompactDataFiles =
+ getDataFiles(table.snapshot(table.currentSnapshot().parentId()),
table);
+ assertThat(preCompactDataFiles).hasSize(3);
+ }
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 66fd098077..990d23f2aa 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -220,6 +220,24 @@ public class FlinkWriteConf {
return
confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
}
+ public boolean expireSnapshotsMode() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key())
+ .flinkConfig(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE)
+ .defaultValue(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.defaultValue())
+ .parse();
+ }
+
+ public boolean deleteOrphanFilesMode() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key())
+ .flinkConfig(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE)
+
.defaultValue(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.defaultValue())
+ .parse();
+ }
+
public boolean compactMode() {
return confParser
.booleanConf()
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index e68e64ac57..ee2aeaa450 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
+import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
/** Flink sink write options */
@@ -82,7 +85,18 @@ public class FlinkWriteOptions {
ConfigOptions.key("write-parallelism").intType().noDefaultValue();
public static final ConfigOption<Boolean> COMPACTION_ENABLE =
-
ConfigOptions.key("compaction-enabled").booleanType().defaultValue(false);
+ ConfigOptions.key(RewriteDataFilesConfig.PREFIX + "enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDeprecatedKeys("compaction-enabled");
+
+ public static final ConfigOption<Boolean> EXPIRE_SNAPSHOTS_ENABLE =
+ ConfigOptions.key(ExpireSnapshotsConfig.PREFIX +
"enabled").booleanType().defaultValue(false);
+
+ public static final ConfigOption<Boolean> DELETE_ORPHAN_FILES_ENABLE =
+ ConfigOptions.key(DeleteOrphanFilesConfig.PREFIX + "enabled")
+ .booleanType()
+ .defaultValue(false);
@Experimental
public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 2fce5e0b3e..63a267d16e 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -53,6 +53,11 @@ public class DeleteOrphanFiles {
ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+ static final Map<String, String> DEFAULT_EQUAL_SCHEMES =
+ ImmutableMap.of(
+ "s3n", "s3",
+ "s3a", "s3");
+
@Internal
public static final OutputTag<Exception> ERROR_STREAM =
new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
@@ -79,12 +84,8 @@ public class DeleteOrphanFiles {
private Duration minAge = Duration.ofDays(3);
private int planningWorkerPoolSize = ThreadPools.WORKER_THREAD_POOL_SIZE;
private int deleteBatchSize = 1000;
- private boolean usePrefixListing = false;
- private Map<String, String> equalSchemes =
- Maps.newHashMap(
- ImmutableMap.of(
- "s3n", "s3",
- "s3a", "s3"));
+ private boolean usePrefixListing = true;
+ private Map<String, String> equalSchemes =
Maps.newHashMap(DEFAULT_EQUAL_SCHEMES);
private final Map<String, String> equalAuthorities = Maps.newHashMap();
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
@@ -189,6 +190,19 @@ public class DeleteOrphanFiles {
return this;
}
+ public Builder config(DeleteOrphanFilesConfig deleteOrphanFilesConfig) {
+ return this.scheduleOnInterval(
+
Duration.ofSeconds(deleteOrphanFilesConfig.scheduleOnIntervalSecond()))
+ .minAge(Duration.ofSeconds(deleteOrphanFilesConfig.minAgeSeconds()))
+ .deleteBatchSize(deleteOrphanFilesConfig.deleteBatchSize())
+ .usePrefixListing(deleteOrphanFilesConfig.usePrefixListing())
+ .prefixMismatchMode(deleteOrphanFilesConfig.prefixMismatchMode())
+ .location(deleteOrphanFilesConfig.location())
+
.planningWorkerPoolSize(deleteOrphanFilesConfig.planningWorkerPoolSize())
+ .equalSchemes(deleteOrphanFilesConfig.equalSchemes())
+ .equalAuthorities(deleteOrphanFilesConfig.equalAuthorities());
+ }
+
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
tableLoader().open();
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..af34735781
--- /dev/null
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+
+public class DeleteOrphanFilesConfig {
+ public static final String PREFIX = FlinkMaintenanceConfig.PREFIX +
"delete-orphan-files.";
+
+ private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+ private static final Splitter EQUALS_SPLITTER = Splitter.on("=").limit(2);
+
+ public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX +
"schedule.interval-second";
+ public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+ .longType()
+ .defaultValue(60 * 60L) // Default is 1 hour
+ .withDescription(
+ "The time interval (in seconds) between two consecutive delete
orphan files operations.");
+
+ public static final String MIN_AGE_SECONDS = PREFIX + "min-age-seconds";
+ public static final ConfigOption<Long> MIN_AGE_SECONDS_OPTION =
+ ConfigOptions.key(MIN_AGE_SECONDS)
+ .longType()
+ .defaultValue(3L * 24 * 60 * 60) // Default is 3 days
+ .withDescription(
+ "The minimum age (in seconds) of files to be considered for
deletion. "
+ + "Files newer than this will not be removed.");
+
+ public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+ public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+ ConfigOptions.key(DELETE_BATCH_SIZE)
+ .intType()
+ .defaultValue(1000)
+ .withDescription("The batch size used for deleting orphan files.");
+
+ public static final String LOCATION = PREFIX + "location";
+ public static final ConfigOption<String> LOCATION_OPTION =
+ ConfigOptions.key(LOCATION)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The location to start recursive listing of candidate files for
removal. "
+ + "By default, the table location is used.");
+
+ public static final String USE_PREFIX_LISTING = PREFIX +
"use-prefix-listing";
+ public static final ConfigOption<Boolean> USE_PREFIX_LISTING_OPTION =
+ ConfigOptions.key(USE_PREFIX_LISTING)
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use prefix listing when listing files from the file
system.");
+
+ public static final String PLANNING_WORKER_POOL_SIZE = PREFIX +
"planning-worker-pool-size";
+ public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+ ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The worker pool size used for planning the scan of the
ALL_FILES table. "
+ + "If not set, the shared worker pool is used.");
+
+ public static final String EQUAL_SCHEMES = PREFIX + "equal-schemes";
+ public static final ConfigOption<String> EQUAL_SCHEMES_OPTION =
+ ConfigOptions.key(EQUAL_SCHEMES)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Schemes that should be considered equal, in the format
'scheme1=scheme2,scheme3=scheme4'.");
+
+ public static final String EQUAL_AUTHORITIES = PREFIX + "equal-authorities";
+ public static final ConfigOption<String> EQUAL_AUTHORITIES_OPTION =
+ ConfigOptions.key(EQUAL_AUTHORITIES)
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Authorities that should be considered equal, in the format
'auth1=auth2,auth3=auth4'.");
+
+ public static final String PREFIX_MISMATCH_MODE = PREFIX +
"prefix-mismatch-mode";
+ public static final ConfigOption<String> PREFIX_MISMATCH_MODE_OPTION =
+ ConfigOptions.key(PREFIX_MISMATCH_MODE)
+ .stringType()
+ .defaultValue(PrefixMismatchMode.ERROR.name())
+ .withDescription(
+ "Action behavior when location prefixes (schemes/authorities)
mismatch. "
+ + "Valid values: ERROR, IGNORE, DELETE.");
+
+ private final FlinkConfParser confParser;
+
+ public DeleteOrphanFilesConfig(
+ Table table, Map<String, String> writeOptions, ReadableConfig
readableConfig) {
+ this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+ }
+
+ public long scheduleOnIntervalSecond() {
+ return confParser
+ .longConf()
+ .option(SCHEDULE_ON_INTERVAL_SECOND)
+ .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+ .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+ .parse();
+ }
+
+ public long minAgeSeconds() {
+ return confParser
+ .longConf()
+ .option(MIN_AGE_SECONDS)
+ .flinkConfig(MIN_AGE_SECONDS_OPTION)
+ .defaultValue(MIN_AGE_SECONDS_OPTION.defaultValue())
+ .parse();
+ }
+
+ public int deleteBatchSize() {
+ return confParser
+ .intConf()
+ .option(DELETE_BATCH_SIZE)
+ .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+ .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+ .parse();
+ }
+
+ public String location() {
+ return
confParser.stringConf().option(LOCATION).flinkConfig(LOCATION_OPTION).parseOptional();
+ }
+
+ public boolean usePrefixListing() {
+ return confParser
+ .booleanConf()
+ .option(USE_PREFIX_LISTING)
+ .flinkConfig(USE_PREFIX_LISTING_OPTION)
+ .defaultValue(USE_PREFIX_LISTING_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Integer planningWorkerPoolSize() {
+ return confParser
+ .intConf()
+ .option(PLANNING_WORKER_POOL_SIZE)
+ .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+ .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .parse();
+ }
+
+ public Map<String, String> equalSchemes() {
+ String equalSchemes =
+ confParser
+ .stringConf()
+ .option(EQUAL_SCHEMES)
+ .flinkConfig(EQUAL_SCHEMES_OPTION)
+ .parseOptional();
+
+ return equalSchemes != null
+ ? parseKeyValuePairs(equalSchemes)
+ : Maps.newHashMap(DeleteOrphanFiles.DEFAULT_EQUAL_SCHEMES);
+ }
+
+ public Map<String, String> equalAuthorities() {
+ String equalAuthorities =
+ confParser
+ .stringConf()
+ .option(EQUAL_AUTHORITIES)
+ .flinkConfig(EQUAL_AUTHORITIES_OPTION)
+ .parseOptional();
+
+ return equalAuthorities != null ? parseKeyValuePairs(equalAuthorities) :
Map.of();
+ }
+
+ public PrefixMismatchMode prefixMismatchMode() {
+ String value =
+ confParser
+ .stringConf()
+ .option(PREFIX_MISMATCH_MODE)
+ .flinkConfig(PREFIX_MISMATCH_MODE_OPTION)
+ .defaultValue(PREFIX_MISMATCH_MODE_OPTION.defaultValue())
+ .parse();
+ return PrefixMismatchMode.valueOf(value);
+ }
+
+ private static Map<String, String> parseKeyValuePairs(String value) {
+ Map<String, String> result = Maps.newHashMap();
+ for (String pair : COMMA_SPLITTER.split(value)) {
+ List<String> parts = EQUALS_SPLITTER.splitToList(pair);
+ Preconditions.checkArgument(parts.size() == 2, "Invalid key-value pair:
%s", pair);
+ result.put(parts.get(0).trim(), parts.get(1).trim());
+ }
+
+ return result;
+ }
+}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index 628a911414..c84932f96f 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.maintenance.api;
import java.time.Duration;
+import java.util.Optional;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -107,6 +108,19 @@ public class ExpireSnapshots {
return this;
}
+ public Builder config(ExpireSnapshotsConfig expireSnapshotsConfig) {
+ return
this.scheduleOnCommitCount(expireSnapshotsConfig.scheduleOnCommitCount())
+
.scheduleOnInterval(Duration.ofSeconds(expireSnapshotsConfig.scheduleOnIntervalSecond()))
+ .deleteBatchSize(expireSnapshotsConfig.deleteBatchSize())
+ .maxSnapshotAge(
+
Optional.ofNullable(expireSnapshotsConfig.maxSnapshotAgeSeconds())
+ .map(Duration::ofSeconds)
+ .orElse(null))
+ .retainLast(expireSnapshotsConfig.retainLast())
+ .cleanExpiredMetadata(expireSnapshotsConfig.cleanExpiredMetadata())
+
.planningWorkerPoolSize(expireSnapshotsConfig.planningWorkerPoolSize());
+ }
+
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be
null");
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..13436975e1
--- /dev/null
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.util.ThreadPools;
+
+public class ExpireSnapshotsConfig {
+ public static final String PREFIX = FlinkMaintenanceConfig.PREFIX +
"expire-snapshots.";
+
+ public static final String SCHEDULE_ON_COMMIT_COUNT = PREFIX +
"schedule.commit-count";
+ public static final ConfigOption<Integer> SCHEDULE_ON_COMMIT_COUNT_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_COMMIT_COUNT)
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The number of commits after which to trigger a new expire
snapshots operation.");
+
+ public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX +
"schedule.interval-second";
+ public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+ ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+ .longType()
+ .defaultValue(60 * 60L) // Default is 1 hour
+ .withDescription(
+ "The time interval (in seconds) between two consecutive expire
snapshots operations.");
+
+ public static final String MAX_SNAPSHOT_AGE_SECONDS = PREFIX +
"max-snapshot-age-seconds";
+ public static final ConfigOption<Long> MAX_SNAPSHOT_AGE_SECONDS_OPTION =
+ ConfigOptions.key(MAX_SNAPSHOT_AGE_SECONDS)
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The maximum age (in seconds) of snapshots to retain. "
+ + "Snapshots older than this will be expired.");
+
+ public static final String RETAIN_LAST = PREFIX + "retain-last";
+ public static final ConfigOption<Integer> RETAIN_LAST_OPTION =
+ ConfigOptions.key(RETAIN_LAST)
+ .intType()
+ .noDefaultValue()
+ .withDescription("The minimum number of snapshots to retain.");
+
+ public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+ public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+ ConfigOptions.key(DELETE_BATCH_SIZE)
+ .intType()
+ .defaultValue(1000)
+ .withDescription("The batch size used for deleting expired files.");
+
+ public static final String CLEAN_EXPIRED_METADATA = PREFIX +
"clean-expired-metadata";
+ public static final ConfigOption<Boolean> CLEAN_EXPIRED_METADATA_OPTION =
+ ConfigOptions.key(CLEAN_EXPIRED_METADATA)
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to clean expired metadata such as partition specs and
schemas.");
+
+ public static final String PLANNING_WORKER_POOL_SIZE = PREFIX +
"planning-worker-pool-size";
+ public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+ ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "The worker pool size used to calculate the files to delete. "
+ + "If not set, the shared worker pool is used.");
+
+ private final FlinkConfParser confParser;
+
+ public ExpireSnapshotsConfig(
+ Table table, Map<String, String> writeOptions, ReadableConfig
readableConfig) {
+ this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+ }
+
+ public int scheduleOnCommitCount() {
+ return confParser
+ .intConf()
+ .option(SCHEDULE_ON_COMMIT_COUNT)
+ .flinkConfig(SCHEDULE_ON_COMMIT_COUNT_OPTION)
+ .defaultValue(SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue())
+ .parse();
+ }
+
+ public long scheduleOnIntervalSecond() {
+ return confParser
+ .longConf()
+ .option(SCHEDULE_ON_INTERVAL_SECOND)
+ .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+ .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Long maxSnapshotAgeSeconds() {
+ return confParser
+ .longConf()
+ .option(MAX_SNAPSHOT_AGE_SECONDS)
+ .flinkConfig(MAX_SNAPSHOT_AGE_SECONDS_OPTION)
+ .parseOptional();
+ }
+
+ public Integer retainLast() {
+ return
confParser.intConf().option(RETAIN_LAST).flinkConfig(RETAIN_LAST_OPTION).parseOptional();
+ }
+
+ public int deleteBatchSize() {
+ return confParser
+ .intConf()
+ .option(DELETE_BATCH_SIZE)
+ .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+ .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Boolean cleanExpiredMetadata() {
+ return confParser
+ .booleanConf()
+ .option(CLEAN_EXPIRED_METADATA)
+ .flinkConfig(CLEAN_EXPIRED_METADATA_OPTION)
+ .defaultValue(CLEAN_EXPIRED_METADATA_OPTION.defaultValue())
+ .parse();
+ }
+
+ public Integer planningWorkerPoolSize() {
+ return confParser
+ .intConf()
+ .option(PLANNING_WORKER_POOL_SIZE)
+ .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+ .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .parse();
+ }
+}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index 0c88abf820..e6f536273e 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -122,6 +122,14 @@ public class FlinkMaintenanceConfig {
return new RewriteDataFilesConfig(table, writeProperties, readableConfig);
}
+ public ExpireSnapshotsConfig createExpireSnapshotsConfig() {
+ return new ExpireSnapshotsConfig(table, writeProperties, readableConfig);
+ }
+
+ public DeleteOrphanFilesConfig createDeleteOrphanFilesConfig() {
+ return new DeleteOrphanFilesConfig(table, writeProperties, readableConfig);
+ }
+
public LockConfig createLockConfig() {
return new LockConfig(table, writeProperties, readableConfig);
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index ab5159be12..d98a1c9ab4 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.maintenance.api;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@@ -252,6 +253,17 @@ public class TableMaintenance {
return this;
}
+ /**
+ * Adds multiple tasks with the given schedules.
+ *
+ * @param tasks to add
+ */
+ public Builder add(Collection<MaintenanceTaskBuilder<?>> tasks) {
+ Preconditions.checkNotNull(tasks, "Tasks collection should not be null");
+ taskBuilders.addAll(tasks);
+ return this;
+ }
+
/** Builds the task graph for the maintenance tasks. */
public void append() throws IOException {
Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least
one task");
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
index c05e7d9180..8e45a2db30 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
@@ -78,7 +78,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
private int maxContinuousEmptyCommits;
private ExecutorService workerPool;
private int continuousEmptyCheckpoints = 0;
- private boolean compactMode = false;
+ private final boolean tableMaintenanceEnabled;
IcebergCommitter(
TableLoader tableLoader,
@@ -88,7 +88,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
int workerPoolSize,
String sinkId,
IcebergFilesCommitterMetrics committerMetrics,
- boolean compactMode) {
+ boolean tableMaintenanceEnabled) {
this.branch = branch;
this.snapshotProperties = snapshotProperties;
this.replacePartitions = replacePartitions;
@@ -107,7 +107,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
ThreadPools.newFixedThreadPool(
"iceberg-committer-pool-" + table.name() + "-" + sinkId,
workerPoolSize);
this.continuousEmptyCheckpoints = 0;
- this.compactMode = compactMode;
+ this.tableMaintenanceEnabled = tableMaintenanceEnabled;
}
@Override
@@ -177,7 +177,7 @@ class IcebergCommitter implements
Committer<IcebergCommittable> {
committerMetrics.updateCommitSummary(summary);
}
- if (!compactMode) {
+ if (!tableMaintenanceEnabled) {
FlinkManifestUtil.deleteCommittedManifests(table, manifests,
newFlinkJobId, checkpointId);
}
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 96c947a853..9f90d8fd35 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -73,8 +73,13 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshots;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
import org.apache.iceberg.flink.maintenance.api.FlinkMaintenanceConfig;
import org.apache.iceberg.flink.maintenance.api.LockConfig;
+import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFiles;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.maintenance.api.TableMaintenance;
@@ -88,6 +93,7 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -159,14 +165,15 @@ public class IcebergSink
private final String branch;
private final boolean overwriteMode;
private final int workerPoolSize;
- private final boolean compactMode;
- private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
-
+ private final boolean maintenanceEnabled;
private final Table table;
// This should only be used for logging/error messages. For any actual logic
always use
// equalityFieldIds instead.
private final Set<String> equalityFieldColumns;
+ private final transient List<MaintenanceTaskBuilder<?>> maintenanceTasks;
+ private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
+
private IcebergSink(
TableLoader tableLoader,
Table table,
@@ -179,6 +186,7 @@ public class IcebergSink
Set<Integer> equalityFieldIds,
String branch,
boolean overwriteMode,
+ List<MaintenanceTaskBuilder<?>> maintenanceTasks,
FlinkMaintenanceConfig flinkMaintenanceConfig,
Set<String> equalityFieldColumns) {
this.tableLoader = tableLoader;
@@ -200,7 +208,8 @@ public class IcebergSink
// This is used to separate files generated by different sinks writing the
same table.
// Also used to generate the aggregator operator name
this.sinkId = UUID.randomUUID().toString();
- this.compactMode = flinkWriteConf.compactMode();
+ this.maintenanceEnabled = !maintenanceTasks.isEmpty();
+ this.maintenanceTasks = maintenanceTasks;
this.flinkMaintenanceConfig = flinkMaintenanceConfig;
this.equalityFieldColumns = equalityFieldColumns;
}
@@ -238,7 +247,7 @@ public class IcebergSink
workerPoolSize,
sinkId,
metrics,
- compactMode);
+ maintenanceEnabled);
}
@Override
@@ -250,7 +259,7 @@ public class IcebergSink
public void addPostCommitTopology(
DataStream<CommittableMessage<IcebergCommittable>> committables) {
- if (!compactMode) {
+ if (maintenanceTasks.isEmpty()) {
return;
}
@@ -264,26 +273,21 @@ public class IcebergSink
.uid(postCommitUid)
.forceNonParallel();
try {
- RewriteDataFilesConfig rewriteDataFilesConfig =
- flinkMaintenanceConfig.createRewriteDataFilesConfig();
- RewriteDataFiles.Builder rewriteBuilder =
- RewriteDataFiles.builder().config(rewriteDataFilesConfig);
-
LockConfig lockConfig = flinkMaintenanceConfig.createLockConfig();
String tableMaintenanceUid = String.format("TableMaintenance : %s",
suffix);
- TableMaintenance.Builder builder =
- StringUtils.isNotEmpty(lockConfig.lockType())
- ? TableMaintenance.forChangeStream(
- tableChangeStream,
- tableLoader,
- LockFactoryBuilder.build(lockConfig, table.name()))
- .uidSuffix(tableMaintenanceUid)
- .add(rewriteBuilder)
- : TableMaintenance.forChangeStream(tableChangeStream,
tableLoader)
- .uidSuffix(tableMaintenanceUid)
- .add(rewriteBuilder);
+
+ TableMaintenance.Builder builder;
+ if (StringUtils.isNotEmpty(lockConfig.lockType())) {
+ builder =
+ TableMaintenance.forChangeStream(
+ tableChangeStream, tableLoader,
LockFactoryBuilder.build(lockConfig, table.name()));
+ } else {
+ builder = TableMaintenance.forChangeStream(tableChangeStream,
tableLoader);
+ }
builder
+ .uidSuffix(tableMaintenanceUid)
+ .add(maintenanceTasks)
.rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
.slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
@@ -344,6 +348,7 @@ public class IcebergSink
private final Map<String, String> snapshotSummary = Maps.newHashMap();
private ReadableConfig readableConfig = new Configuration();
private List<String> equalityFieldColumns = null;
+ private final List<MaintenanceTaskBuilder<?>> maintenanceTasks =
Lists.newArrayList();
private Builder() {}
@@ -626,6 +631,85 @@ public class IcebergSink
return this;
}
+ /**
+ * Enables or disables compaction (rewriting data files) as a post-commit
maintenance task.
+ *
+ * @param enabled whether to enable compaction
+ * @see RewriteDataFilesConfig for the default config.
+ * @deprecated See {@code rewriteDatafiles(..)}
+ */
+ @Deprecated
+ public Builder compaction(boolean enabled) {
+ writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(),
Boolean.toString(enabled));
+ return this;
+ }
+
+ /**
+ * Enables rewriting data files (compaction) as a post-commit maintenance
task.
+ *
+ * @see RewriteDataFilesConfig for the default config.
+ */
+ public Builder rewriteDataFiles() {
+ writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+ return this;
+ }
+
+ /**
+ * Enables rewriting data files (compaction) as a post-commit maintenance
task.
+ *
+ * @param config task-specific configuration, see {@link
RewriteDataFilesConfig} for available
+ * keys
+ */
+ public Builder rewriteDataFiles(Map<String, String> config) {
+ rewriteDataFiles();
+ writeOptions.putAll(config);
+ return this;
+ }
+
+ /**
+ * Enables expire snapshots as a post-commit maintenance task.
+ *
+ * @see ExpireSnapshotsConfig for the default config.
+ */
+ public Builder expireSnapshots() {
+ writeOptions.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(),
"true");
+ return this;
+ }
+
+ /**
+ * Enables or disables expire snapshots as a post-commit maintenance task.
+ *
+ * @param config task-specific configuration, see {@link
ExpireSnapshotsConfig} for available
+ * keys
+ */
+ public Builder expireSnapshots(Map<String, String> config) {
+ expireSnapshots();
+ writeOptions.putAll(config);
+ return this;
+ }
+
+ /**
+ * Enables delete orphan files as a post-commit maintenance task.
+ *
+ * @see DeleteOrphanFilesConfig for the default config.
+ */
+ public Builder deleteOrphanFiles() {
+ writeOptions.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(),
"true");
+ return this;
+ }
+
+ /**
+ * Enables delete orphan files as a post-commit maintenance task.
+ *
+ * @param config task-specific configuration, see {@link
DeleteOrphanFilesConfig} for available
+ * keys.
+ */
+ public Builder deleteOrphanFiles(Map<String, String> config) {
+ deleteOrphanFiles();
+ writeOptions.putAll(config);
+ return this;
+ }
+
@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
@@ -682,6 +766,24 @@ public class IcebergSink
FlinkMaintenanceConfig flinkMaintenanceConfig =
new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
+ if (flinkWriteConf.compactMode()) {
+ RewriteDataFilesConfig rewriteDataFilesConfig =
+ flinkMaintenanceConfig.createRewriteDataFilesConfig();
+
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+ }
+
+ if (flinkWriteConf.expireSnapshotsMode()) {
+ ExpireSnapshotsConfig expireSnapshotsConfig =
+ flinkMaintenanceConfig.createExpireSnapshotsConfig();
+
maintenanceTasks.add(ExpireSnapshots.builder().config(expireSnapshotsConfig));
+ }
+
+ if (flinkWriteConf.deleteOrphanFilesMode()) {
+ DeleteOrphanFilesConfig deleteOrphanFilesConfig =
+ flinkMaintenanceConfig.createDeleteOrphanFilesConfig();
+
maintenanceTasks.add(DeleteOrphanFiles.builder().config(deleteOrphanFilesConfig));
+ }
+
Set<String> equalityFieldColumnsSet =
equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns)
: null;
@@ -699,6 +801,7 @@ public class IcebergSink
equalityFieldIds,
flinkWriteConf.branch(),
overwriteMode,
+ maintenanceTasks,
flinkMaintenanceConfig,
equalityFieldColumnsSet);
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..f26f608ffa
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestDeleteOrphanFilesConfig extends OperatorTestBase {
+ private Table table;
+ private Map<String, String> input = Maps.newHashMap();
+
+ @BeforeEach
+ public void before() {
+ this.table = createTable();
+ input.put(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+ input.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+ input.put(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE, "500");
+ input.put(DeleteOrphanFilesConfig.LOCATION, "/tmp/test-location");
+ input.put(DeleteOrphanFilesConfig.USE_PREFIX_LISTING, "true");
+ input.put(DeleteOrphanFilesConfig.PLANNING_WORKER_POOL_SIZE, "4");
+ input.put(DeleteOrphanFilesConfig.EQUAL_SCHEMES, "s3n=s3,s3a=s3");
+ input.put(DeleteOrphanFilesConfig.EQUAL_AUTHORITIES, "auth1=auth2");
+ input.put(DeleteOrphanFilesConfig.PREFIX_MISMATCH_MODE, "IGNORE");
+ input.put("other.config", "should-be-ignored");
+ }
+
+ @AfterEach
+ public void after() {
+ input.clear();
+ }
+
+ @Test
+ void testConfigParsing() {
+ DeleteOrphanFilesConfig config = new DeleteOrphanFilesConfig(table, input,
new Configuration());
+
+ assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+ assertThat(config.minAgeSeconds()).isEqualTo(86400L);
+ assertThat(config.deleteBatchSize()).isEqualTo(500);
+ assertThat(config.location()).isEqualTo("/tmp/test-location");
+ assertThat(config.usePrefixListing()).isTrue();
+ assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+ assertThat(config.equalSchemes()).containsEntry("s3n",
"s3").containsEntry("s3a", "s3");
+ assertThat(config.equalAuthorities()).containsEntry("auth1", "auth2");
+
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.IGNORE);
+ }
+
+ @Test
+ void testConfigDefaults() {
+ DeleteOrphanFilesConfig config =
+ new DeleteOrphanFilesConfig(table, Maps.newHashMap(), new
Configuration());
+
+ assertThat(config.scheduleOnIntervalSecond())
+
.isEqualTo(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+ assertThat(config.minAgeSeconds())
+
.isEqualTo(DeleteOrphanFilesConfig.MIN_AGE_SECONDS_OPTION.defaultValue());
+ assertThat(config.deleteBatchSize())
+
.isEqualTo(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+ assertThat(config.location()).isNull();
+ assertThat(config.usePrefixListing()).isTrue();
+
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+ assertThat(config.equalSchemes()).containsEntry("s3n",
"s3").containsEntry("s3a", "s3");
+ assertThat(config.equalAuthorities()).isEqualTo(Map.of());
+
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.ERROR);
+ }
+}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..3bcec8114b
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExpireSnapshotsConfig extends OperatorTestBase {
+ private Table table;
+ private Map<String, String> input = Maps.newHashMap();
+
+ @BeforeEach
+ public void before() {
+ this.table = createTable();
+ input.put(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT, "10");
+ input.put(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+ input.put(ExpireSnapshotsConfig.MAX_SNAPSHOT_AGE_SECONDS, "7200");
+ input.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+ input.put(ExpireSnapshotsConfig.DELETE_BATCH_SIZE, "500");
+ input.put(ExpireSnapshotsConfig.CLEAN_EXPIRED_METADATA, "true");
+ input.put(ExpireSnapshotsConfig.PLANNING_WORKER_POOL_SIZE, "4");
+ input.put("other.config", "should-be-ignored");
+ }
+
+ @AfterEach
+ public void after() {
+ input.clear();
+ }
+
+ @Test
+ void testConfigParsing() {
+ ExpireSnapshotsConfig config = new ExpireSnapshotsConfig(table, input, new
Configuration());
+
+ assertThat(config.scheduleOnCommitCount()).isEqualTo(10);
+ assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+ assertThat(config.maxSnapshotAgeSeconds()).isEqualTo(7200L);
+ assertThat(config.retainLast()).isEqualTo(5);
+ assertThat(config.deleteBatchSize()).isEqualTo(500);
+ assertThat(config.cleanExpiredMetadata()).isTrue();
+ assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+ }
+
+ @Test
+ void testConfigDefaults() {
+ ExpireSnapshotsConfig config =
+ new ExpireSnapshotsConfig(table, Maps.newHashMap(), new
Configuration());
+
+ assertThat(config.scheduleOnCommitCount())
+
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue());
+ assertThat(config.scheduleOnIntervalSecond())
+
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+ assertThat(config.maxSnapshotAgeSeconds()).isNull();
+ assertThat(config.retainLast()).isNull();
+ assertThat(config.deleteBatchSize())
+
.isEqualTo(ExpireSnapshotsConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+ assertThat(config.cleanExpiredMetadata()).isTrue();
+
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+ }
+}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
similarity index 53%
rename from
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
rename to
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
index 8042fe6e0f..5c926d7c25 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
import org.apache.iceberg.flink.maintenance.api.LockConfig;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -51,7 +53,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
-class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase {
+class TestIcebergSinkTableMaintenance extends TestFlinkIcebergSinkBase {
private static final String[] LOCK_TYPES = new String[]
{LockConfig.JdbcLockConfig.JDBC, ""};
private Map<String, String> flinkConf;
@@ -59,9 +61,6 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase
{
@BeforeEach
void before() throws IOException {
this.flinkConf = Maps.newHashMap();
- flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
- flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
- flinkConf.put(RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
table =
CATALOG_EXTENSION
@@ -84,6 +83,7 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase
{
@FieldSource("LOCK_TYPES")
public void testCompactFileE2e(String lockType) throws Exception {
setupLockConfig(lockType);
+ setupCompactionConfig();
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -124,6 +124,7 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
@FieldSource("LOCK_TYPES")
public void testTableMaintenanceOperatorAdded(String lockType) {
setupLockConfig(lockType);
+ setupCompactionConfig();
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -147,6 +148,22 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
assertThat(containRewrite).isTrue();
}
+ private void setupCompactionConfig() {
+ flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+ flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
+ flinkConf.put(RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+ }
+
+ private void setupExpireSnapshotsConfig() {
+ flinkConf.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), "true");
+ flinkConf.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+ }
+
+ private void setupDeleteOrphanFilesConfig() {
+ flinkConf.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), "true");
+ flinkConf.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+ }
+
private void setupLockConfig(String lockType) {
if (lockType.equals(LockConfig.JdbcLockConfig.JDBC)) {
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(),
LockConfig.JdbcLockConfig.JDBC);
@@ -159,4 +176,146 @@ class TestIcebergSinkCompact extends
TestFlinkIcebergSinkBase {
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), "");
}
}
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testExpireSnapshotsEnabled(String lockType) {
+ setupLockConfig(lockType);
+ setupExpireSnapshotsConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containExpire = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Expire")) {
+ containExpire = true;
+ break;
+ }
+ }
+
+ assertThat(containExpire).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testDeleteOrphanFilesEnabled(String lockType) {
+ setupLockConfig(lockType);
+ setupDeleteOrphanFilesConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containOrphan = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Orphan")) {
+ containOrphan = true;
+ break;
+ }
+ }
+
+ assertThat(containOrphan).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testAllMaintenanceTasksCombined(String lockType) {
+ setupLockConfig(lockType);
+ setupCompactionConfig();
+ setupExpireSnapshotsConfig();
+ setupDeleteOrphanFilesConfig();
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .setAll(flinkConf)
+ .append();
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ boolean containRewrite = false;
+ boolean containExpire = false;
+ boolean containOrphan = false;
+ for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+ if (vertex.getName().contains("Rewrite")) {
+ containRewrite = true;
+ }
+
+ if (vertex.getName().contains("Expire")) {
+ containExpire = true;
+ }
+
+ if (vertex.getName().contains("Orphan")) {
+ containOrphan = true;
+ }
+ }
+
+ assertThat(containRewrite).isTrue();
+ assertThat(containExpire).isTrue();
+ assertThat(containOrphan).isTrue();
+ }
+
+ @ParameterizedTest(name = "lockType = {0}")
+ @FieldSource("LOCK_TYPES")
+ public void testAllMaintenanceE2e(String lockType) throws Exception {
+ setupLockConfig(lockType);
+
+ Map<String, String> compactionConfig = Maps.newHashMap();
+ compactionConfig.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE,
"1");
+ compactionConfig.put(
+ RewriteDataFilesConfig.PREFIX +
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+
+ Map<String, String> expireConfig = Maps.newHashMap();
+ expireConfig.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+
+ Map<String, String> orphanConfig = Maps.newHashMap();
+ orphanConfig.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
+ DataStream<RowData> dataStream =
+ env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+ IcebergSink.forRowData(dataStream)
+ .setAll(flinkConf)
+ .table(table)
+ .tableLoader(tableLoader)
+ .rewriteDataFiles(compactionConfig)
+ .expireSnapshots(expireConfig)
+ .deleteOrphanFiles(orphanConfig)
+ .append();
+
+ env.execute("Test All Maintenance E2E");
+
+ table.refresh();
+ // Compaction should have merged the 3 data files into 1
+ List<DataFile> afterCompactDataFiles =
getDataFiles(table.currentSnapshot(), table);
+ assertThat(afterCompactDataFiles).hasSize(1);
+
+ List<DataFile> preCompactDataFiles =
+ getDataFiles(table.snapshot(table.currentSnapshot().parentId()),
table);
+ assertThat(preCompactDataFiles).hasSize(3);
+ }
}