This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new d4302267f [AMORO-3790] Add flink checkpoint retention configuration
(#3814)
d4302267f is described below
commit d4302267ffe91920b032a3f4292c99c44a2f4996
Author: Xu Bai <[email protected]>
AuthorDate: Tue Oct 14 17:29:04 2025 +0800
[AMORO-3790] Add flink checkpoint retention configuration (#3814)
* Add flink checkpoint retention configuration
* Add documentation and comments
* spotless
---
.../maintainer/IcebergTableMaintainer.java | 28 +++++++++++++++-------
.../amoro/server/table/TableConfigurations.java | 7 ++++++
.../apache/amoro/config/TableConfiguration.java | 12 ++++++++++
.../org/apache/amoro/table/TableProperties.java | 14 +++++++++++
docs/user-guides/configurations.md | 1 +
5 files changed, 53 insertions(+), 9 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 707df4666..e30c17ac7 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -332,15 +332,25 @@ public class IcebergTableMaintainer implements
TableMaintainer {
}
protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
- return min(
- // The snapshots keep time
- now - snapshotsKeepTime(tableRuntime),
- // The snapshot optimizing plan based should not be expired for
committing
- fetchOptimizingPlanSnapshotTime(table, tableRuntime),
- // The latest non-optimized snapshot should not be expired for data
expiring
- fetchLatestNonOptimizedSnapshotTime(table),
- // The latest flink committed snapshot should not be expired for
recovering flink job
- fetchLatestFlinkCommittedSnapshotTime(table));
+ long mustOlderThan =
+ min(
+ // The snapshots keep time
+ now - snapshotsKeepTime(tableRuntime),
+ // The snapshot optimizing plan based should not be expired for
committing
+ fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+ // The latest non-optimized snapshot should not be expired for
data expiring
+ fetchLatestNonOptimizedSnapshotTime(table));
+
+ long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
+ long flinkCkRetainMillis =
tableRuntime.getTableConfiguration().getFlinkCheckpointRetention();
+ if ((now - latestFlinkCommitTime) > flinkCkRetainMillis) {
+ // exceed configured flink checkpoint retain time, no need to consider
flink committed
+ // snapshot
+ return mustOlderThan;
+ } else {
+ // keep at least flink committed snapshot for flink job recovering
+ return min(mustOlderThan, latestFlinkCommitTime);
+ }
}
protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index 7838f7c98..9b469bd32 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -75,6 +75,13 @@ public class TableConfigurations {
properties,
TableProperties.SNAPSHOT_MIN_COUNT,
TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT))
+ .setFlinkCheckpointRetention(
+ ConfigHelpers.TimeUtils.parseDuration(
+ CompatiblePropertyUtil.propertyAsString(
+ properties,
+ TableProperties.SNAPSHOT_FLINK_CHECKPOINT_RETENTION,
+
TableProperties.SNAPSHOT_FLINK_CHECKPOINT_RETENTION_DEFAULT))
+ .toMillis())
.setChangeDataTTLMinutes(
CompatiblePropertyUtil.propertyAsLong(
properties,
diff --git
a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
index 5efedfbbe..08d700d1e 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
@@ -28,6 +28,7 @@ public class TableConfiguration {
private boolean expireSnapshotEnabled;
private long snapshotTTLMinutes;
private int snapshotMinCount;
+ private long flinkCheckpointRetention;
private long changeDataTTLMinutes;
private boolean cleanOrphanEnabled;
private long orphanExistingMinutes;
@@ -86,6 +87,15 @@ public class TableConfiguration {
return this;
}
+ public long getFlinkCheckpointRetention() {
+ return flinkCheckpointRetention;
+ }
+
+ public TableConfiguration setFlinkCheckpointRetention(long
flinkCheckpointRetention) {
+ this.flinkCheckpointRetention = flinkCheckpointRetention;
+ return this;
+ }
+
public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes)
{
this.changeDataTTLMinutes = changeDataTTLMinutes;
return this;
@@ -141,6 +151,7 @@ public class TableConfiguration {
return expireSnapshotEnabled == that.expireSnapshotEnabled
&& snapshotTTLMinutes == that.snapshotTTLMinutes
&& snapshotMinCount == that.snapshotMinCount
+ && flinkCheckpointRetention == that.flinkCheckpointRetention
&& changeDataTTLMinutes == that.changeDataTTLMinutes
&& cleanOrphanEnabled == that.cleanOrphanEnabled
&& orphanExistingMinutes == that.orphanExistingMinutes
@@ -156,6 +167,7 @@ public class TableConfiguration {
expireSnapshotEnabled,
snapshotTTLMinutes,
snapshotMinCount,
+ flinkCheckpointRetention,
changeDataTTLMinutes,
cleanOrphanEnabled,
orphanExistingMinutes,
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 6d7a436c1..8252147b3 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -147,6 +147,20 @@ public class TableProperties {
public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
+ /**
+ * The retention period for snapshots created by Flink checkpoints.
Snapshots older than this
+ * duration may be cleaned up. Avoid keeping the last flink checkpoint
snapshot for too long, as
+ * it may reference old data files.
+ *
+ * <p>Format: A string representing the duration, e.g., "7d" for 7 days,
"12h" for 12 hours.
+ *
+ * <p>Default: "7d" (7 days)
+ */
+ public static final String SNAPSHOT_FLINK_CHECKPOINT_RETENTION =
+ "snapshot.keep.flink.checkpoint-retention";
+
+ public static final String SNAPSHOT_FLINK_CHECKPOINT_RETENTION_DEFAULT =
"7d"; // 7 Days
+
public static final String ENABLE_ORPHAN_CLEAN = "clean-orphan-file.enabled";
public static final boolean ENABLE_ORPHAN_CLEAN_DEFAULT = false;
diff --git a/docs/user-guides/configurations.md
b/docs/user-guides/configurations.md
index 05b767b6d..1895812ce 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -73,6 +73,7 @@ Data-cleaning configurations are applicable to both Iceberg
Format and Mixed str
| change.data.ttl.minutes | 10080(7 days) | Time to
live in minutes for data of ChangeStore
|
| snapshot.keep.duration | 720min(12 hours) |
Table-Expiration keeps the latest snapshots within a specified duration
|
| snapshot.keep.min-count | 1 | Minimum
number of snapshots retained for table expiration
|
+| snapshot.keep.flink.checkpoint-retention | 7d(7 days) | The
retention period for snapshots created by Flink checkpoints. Snapshots older
than this duration may be cleaned up. The value should be specified as a
duration string (e.g., "7d", "168h", "10080min")
|
| clean-orphan-file.enabled | false | Enables
periodically clean orphan files
|
| clean-orphan-file.min-existing-time-minutes | 2880(2 days) | Cleaning
orphan files keeps the files modified within a specified time in minutes
|
| clean-dangling-delete-files.enabled | true | Whether to
enable cleaning of dangling delete files
|