This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new d4302267f [AMORO-3790] Add flink checkpoint retention configuration 
(#3814)
d4302267f is described below

commit d4302267ffe91920b032a3f4292c99c44a2f4996
Author: Xu Bai <[email protected]>
AuthorDate: Tue Oct 14 17:29:04 2025 +0800

    [AMORO-3790] Add flink checkpoint retention configuration (#3814)
    
    * Add flink checkpoint retention configuration
    
    * Add documentation and comments
    
    * spotless
---
 .../maintainer/IcebergTableMaintainer.java         | 28 +++++++++++++++-------
 .../amoro/server/table/TableConfigurations.java    |  7 ++++++
 .../apache/amoro/config/TableConfiguration.java    | 12 ++++++++++
 .../org/apache/amoro/table/TableProperties.java    | 14 +++++++++++
 docs/user-guides/configurations.md                 |  1 +
 5 files changed, 53 insertions(+), 9 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 707df4666..e30c17ac7 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -332,15 +332,25 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
   }
 
   protected long mustOlderThan(DefaultTableRuntime tableRuntime, long now) {
-    return min(
-        // The snapshots keep time
-        now - snapshotsKeepTime(tableRuntime),
-        // The snapshot optimizing plan based should not be expired for 
committing
-        fetchOptimizingPlanSnapshotTime(table, tableRuntime),
-        // The latest non-optimized snapshot should not be expired for data 
expiring
-        fetchLatestNonOptimizedSnapshotTime(table),
-        // The latest flink committed snapshot should not be expired for 
recovering flink job
-        fetchLatestFlinkCommittedSnapshotTime(table));
+    long mustOlderThan =
+        min(
+            // The snapshots keep time
+            now - snapshotsKeepTime(tableRuntime),
+            // The snapshot optimizing plan based should not be expired for 
committing
+            fetchOptimizingPlanSnapshotTime(table, tableRuntime),
+            // The latest non-optimized snapshot should not be expired for 
data expiring
+            fetchLatestNonOptimizedSnapshotTime(table));
+
+    long latestFlinkCommitTime = fetchLatestFlinkCommittedSnapshotTime(table);
+    long flinkCkRetainMillis = 
tableRuntime.getTableConfiguration().getFlinkCheckpointRetention();
+    if ((now - latestFlinkCommitTime) > flinkCkRetainMillis) {
+      // exceed configured flink checkpoint retain time, no need to consider 
flink committed
+      // snapshot
+      return mustOlderThan;
+    } else {
+      // keep at least flink committed snapshot for flink job recovering
+      return min(mustOlderThan, latestFlinkCommitTime);
+    }
   }
 
   protected long snapshotsKeepTime(DefaultTableRuntime tableRuntime) {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index 7838f7c98..9b469bd32 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -75,6 +75,13 @@ public class TableConfigurations {
                 properties,
                 TableProperties.SNAPSHOT_MIN_COUNT,
                 TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT))
+        .setFlinkCheckpointRetention(
+            ConfigHelpers.TimeUtils.parseDuration(
+                    CompatiblePropertyUtil.propertyAsString(
+                        properties,
+                        TableProperties.SNAPSHOT_FLINK_CHECKPOINT_RETENTION,
+                        
TableProperties.SNAPSHOT_FLINK_CHECKPOINT_RETENTION_DEFAULT))
+                .toMillis())
         .setChangeDataTTLMinutes(
             CompatiblePropertyUtil.propertyAsLong(
                 properties,
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java 
b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
index 5efedfbbe..08d700d1e 100644
--- a/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
+++ b/amoro-common/src/main/java/org/apache/amoro/config/TableConfiguration.java
@@ -28,6 +28,7 @@ public class TableConfiguration {
   private boolean expireSnapshotEnabled;
   private long snapshotTTLMinutes;
   private int snapshotMinCount;
+  private long flinkCheckpointRetention;
   private long changeDataTTLMinutes;
   private boolean cleanOrphanEnabled;
   private long orphanExistingMinutes;
@@ -86,6 +87,15 @@ public class TableConfiguration {
     return this;
   }
 
+  public long getFlinkCheckpointRetention() {
+    return flinkCheckpointRetention;
+  }
+
+  public TableConfiguration setFlinkCheckpointRetention(long 
flinkCheckpointRetention) {
+    this.flinkCheckpointRetention = flinkCheckpointRetention;
+    return this;
+  }
+
   public TableConfiguration setChangeDataTTLMinutes(long changeDataTTLMinutes) 
{
     this.changeDataTTLMinutes = changeDataTTLMinutes;
     return this;
@@ -141,6 +151,7 @@ public class TableConfiguration {
     return expireSnapshotEnabled == that.expireSnapshotEnabled
         && snapshotTTLMinutes == that.snapshotTTLMinutes
         && snapshotMinCount == that.snapshotMinCount
+        && flinkCheckpointRetention == that.flinkCheckpointRetention
         && changeDataTTLMinutes == that.changeDataTTLMinutes
         && cleanOrphanEnabled == that.cleanOrphanEnabled
         && orphanExistingMinutes == that.orphanExistingMinutes
@@ -156,6 +167,7 @@ public class TableConfiguration {
         expireSnapshotEnabled,
         snapshotTTLMinutes,
         snapshotMinCount,
+        flinkCheckpointRetention,
         changeDataTTLMinutes,
         cleanOrphanEnabled,
         orphanExistingMinutes,
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 6d7a436c1..8252147b3 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -147,6 +147,20 @@ public class TableProperties {
   public static final String SNAPSHOT_MIN_COUNT = "snapshot.keep.min-count";
   public static final int SNAPSHOT_MIN_COUNT_DEFAULT = 1;
 
+  /**
+   * The retention period for snapshots created by Flink checkpoints. 
Snapshots older than this
+   * duration may be cleaned up. Avoid keeping the last flink checkpoint 
snapshot for too long, as
+   * it may reference old data files.
+   *
+   * <p>Format: A string representing the duration, e.g., "7d" for 7 days, 
"12h" for 12 hours.
+   *
+   * <p>Default: "7d" (7 days)
+   */
+  public static final String SNAPSHOT_FLINK_CHECKPOINT_RETENTION =
+      "snapshot.keep.flink.checkpoint-retention";
+
+  public static final String SNAPSHOT_FLINK_CHECKPOINT_RETENTION_DEFAULT = 
"7d"; // 7 Days
+
   public static final String ENABLE_ORPHAN_CLEAN = "clean-orphan-file.enabled";
   public static final boolean ENABLE_ORPHAN_CLEAN_DEFAULT = false;
 
diff --git a/docs/user-guides/configurations.md 
b/docs/user-guides/configurations.md
index 05b767b6d..1895812ce 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -73,6 +73,7 @@ Data-cleaning configurations are applicable to both Iceberg 
Format and Mixed str
 | change.data.ttl.minutes                     | 10080(7 days)    | Time to 
live in minutes for data of ChangeStore                                         
                                                                                
                                                                                
              |
 | snapshot.keep.duration                      | 720min(12 hours) | 
Table-Expiration keeps the latest snapshots within a specified duration         
                                                                                
                                                                                
                      |
 | snapshot.keep.min-count                     | 1                | Minimum 
number of snapshots retained for table expiration                               
                                                                                
                                                                                
              |
+| snapshot.keep.flink.checkpoint-retention    | 7d(7 days)       | The 
retention period for snapshots created by Flink checkpoints. Snapshots older 
than this duration may be cleaned up. The value should be specified as a 
duration string (e.g., "7d", "168h", "10080min")                                
                            |
 | clean-orphan-file.enabled                   | false            | Enables 
periodically clean orphan files                                                 
                                                                                
                                                                                
              |
 | clean-orphan-file.min-existing-time-minutes | 2880(2 days)     | Cleaning 
orphan files keeps the files modified within a specified time in minutes        
                                                                                
                                                                                
             |
 | clean-dangling-delete-files.enabled         | true             | Whether to 
enable cleaning of dangling delete files                                        
                                                                                
                                                                                
           |

Reply via email to