This is an automated email from the ASF dual-hosted git repository.

tsreaper pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 81dd5132ff [core] Check next snapshot's time for snapshotTimeRetain 
expiration protection (#7706)
81dd5132ff is described below

commit 81dd5132ffe15f5af4a15c958f13fb9f20ee8f5e
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 8 13:51:52 2026 +0800

    [core] Check next snapshot's time for snapshotTimeRetain expiration 
protection (#7706)
---
 .../apache/paimon/table/ExpireSnapshotsImpl.java   | 20 +++--
 .../paimon/operation/ExpireSnapshotsTest.java      | 91 +++++++++++++++++++---
 .../procedure/ExpireSnapshotsProcedureITCase.java  |  8 +-
 3 files changed, 99 insertions(+), 20 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index beeb6ae2b5..ef3796f20d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot;
@@ -63,6 +64,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
     private final TagManager tagManager;
 
     private ExpireConfig expireConfig;
+    private Supplier<Long> currentTimeMillis = System::currentTimeMillis;
 
     public ExpireSnapshotsImpl(
             SnapshotManager snapshotManager,
@@ -82,6 +84,11 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
         this.fileExecutor = snapshotDeletion.fileExecutor();
     }
 
+    @VisibleForTesting
+    public void setCurrentTimeMillis(Supplier<Long> currentTimeMillis) {
+        this.currentTimeMillis = currentTimeMillis;
+    }
+
     @Override
     public ExpireSnapshots config(ExpireConfig expireConfig) {
         this.expireConfig = expireConfig;
@@ -95,7 +102,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
         int retainMin = expireConfig.getSnapshotRetainMin();
         int maxDeletes = expireConfig.getSnapshotMaxDeletes();
         long olderThanMills =
-                System.currentTimeMillis() - 
expireConfig.getSnapshotTimeRetain().toMillis();
+                currentTimeMillis.get() - 
expireConfig.getSnapshotTimeRetain().toMillis();
 
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         if (latestSnapshotId == null) {
@@ -136,10 +143,11 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
 
         for (long id = min; id < maxExclusive; id++) {
             // Early exit the loop for 'snapshot.time-retained'
-            // (the maximum time of snapshots to retain)
+            // A snapshot can only be expired if its next snapshot has been 
alive
+            // longer than snapshotTimeRetain, providing stronger protection
             try {
-                Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
-                if (olderThanMills <= snapshot.timeMillis()) {
+                Snapshot nextSnapshot = snapshotManager.tryGetSnapshot(id + 1);
+                if (olderThanMills <= nextSnapshot.timeMillis()) {
                     return expireUntil(earliest, id);
                 }
             } catch (FileNotFoundException e) {
@@ -162,7 +170,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots 
{
 
     private int innerExpireUntil(long earliestId, long endExclusiveId)
             throws ExecutionException, InterruptedException {
-        long startTime = System.currentTimeMillis();
+        long startTime = currentTimeMillis.get();
 
         if (endExclusiveId <= earliestId) {
             // No expire happens:
@@ -268,7 +276,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots 
{
         }
 
         writeEarliestHint(endExclusiveId);
-        long duration = System.currentTimeMillis() - startTime;
+        long duration = currentTimeMillis.get() - startTime;
         LOG.info(
                 "Finished expire snapshots, duration {} ms, range is [{}, {})",
                 duration,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 01e0473e88..0b2b0031c8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -44,10 +44,13 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.ExpireSnapshotsImpl;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.RepeatedTest;
@@ -524,31 +527,90 @@ public class ExpireSnapshotsTest {
         builder.snapshotRetainMin(1)
                 .snapshotRetainMax(Integer.MAX_VALUE)
                 .snapshotTimeRetain(Duration.ofMillis(1000));
-        ExpireSnapshots expire = store.newExpire(builder.build());
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) 
store.newExpire(builder.build());
 
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
         commit(5, allData, snapshotPositions);
-        Thread.sleep(1500);
+        for (int i = 1; i <= 5; i++) {
+            rewriteSnapshotTime(i, 0);
+        }
         commit(5, allData, snapshotPositions);
-        long expireMillis = System.currentTimeMillis();
-        // expire twice to check for idempotence
+        for (int i = 6; i <= 10; i++) {
+            rewriteSnapshotTime(i, 2000);
+        }
 
+        // expire at time 2500, olderThanMills = 1500
+        expire.setCurrentTimeMillis(() -> 2500L);
+        // expire twice to check for idempotence
         
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();
         
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();
 
         int latestSnapshotId = 
requireNonNull(snapshotManager.latestSnapshotId()).intValue();
-        for (int i = 1; i <= latestSnapshotId; i++) {
-            if (snapshotManager.snapshotExists(i)) {
-                assertThat(snapshotManager.snapshot(i).timeMillis())
-                        .isBetween(expireMillis - 1000, expireMillis);
-                assertSnapshot(i, allData, snapshotPositions);
-            }
+        // snapshots 1-4 should be expired, snapshot 5 is retained because its 
next
+        // snapshot (6) is within the time window
+        for (int i = 1; i <= 4; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isFalse();
+        }
+        for (int i = 5; i <= latestSnapshotId; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
+            assertSnapshot(i, allData, snapshotPositions);
         }
 
         store.assertCleaned();
     }
 
+    @Test
+    public void testExpireWithTimeProtectsEachSnapshot() throws Exception {
+        // Even with a small retainMin, each snapshot should be protected by
+        // snapshotTimeRetain: a snapshot can only be expired when its next
+        // snapshot has been alive longer than snapshotTimeRetain.
+        ExpireConfig.Builder builder = ExpireConfig.builder();
+        builder.snapshotRetainMin(1)
+                .snapshotRetainMax(Integer.MAX_VALUE)
+                .snapshotTimeRetain(Duration.ofMillis(5000));
+        ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) 
store.newExpire(builder.build());
+
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+
+        // create 5 snapshots quickly
+        commit(5, allData, snapshotPositions);
+        for (int i = 1; i <= 5; i++) {
+            rewriteSnapshotTime(i, 0);
+        }
+
+        // expire immediately - no snapshot should be expired because each
+        // snapshot's next snapshot is still within the time window
+        expire.setCurrentTimeMillis(() -> 100L);
+        expire.config(builder.build()).expire();
+
+        for (int i = 1; i <= 5; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isTrue();
+            assertSnapshot(i, allData, snapshotPositions);
+        }
+
+        // create one more snapshot so snapshot 5 has a "next"
+        commit(1, allData, snapshotPositions);
+        rewriteSnapshotTime(6, 6000);
+
+        // expire again - now snapshots 1-4 can be expired (their next 
snapshots
+        // are older than 5000ms), but snapshot 5 is still protected because 
its
+        // next snapshot (6) was just created
+        expire.setCurrentTimeMillis(() -> 6500L);
+        expire.config(builder.build()).expire();
+
+        for (int i = 1; i <= 4; i++) {
+            assertThat(snapshotManager.snapshotExists(i)).isFalse();
+        }
+        assertThat(snapshotManager.snapshotExists(5)).isTrue();
+        assertThat(snapshotManager.snapshotExists(6)).isTrue();
+        assertSnapshot(5, allData, snapshotPositions);
+        assertSnapshot(6, allData, snapshotPositions);
+
+        store.assertCleaned();
+    }
+
     @Test
     public void testExpireWithUpgradedFile() throws Exception {
         // write & commit data
@@ -740,6 +802,15 @@ public class ExpireSnapshotsTest {
                 .build();
     }
 
+    private void rewriteSnapshotTime(long snapshotId, long newTimeMillis) 
throws IOException {
+        String oldJson = 
fileIO.readFileUtf8(snapshotManager.snapshotPath(snapshotId));
+        ObjectNode node = (ObjectNode) 
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(oldJson);
+        node.put("timeMillis", newTimeMillis);
+        String newJson = 
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.writeValueAsString(node);
+        fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId), 
newJson);
+        snapshotManager.invalidateCache();
+    }
+
     protected void commit(int numCommits, List<KeyValue> allData, 
List<Integer> snapshotPositions)
             throws Exception {
         for (int i = 0; i < numCommits; i++) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
index bfec37fb0c..0c590510ab 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
@@ -72,10 +72,10 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
                         + "', retain_min => 3)");
         checkSnapshots(snapshotManager, 4, 6);
 
-        // older_than => timestamp of snapshot 6, expected snapshots (6)
+        // older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
         sql(
                 "CALL sys.expire_snapshots(`table`  => 'default.word_count', 
older_than => '"
-                        + ts6.toString()
+                        + new Timestamp(ts6.getTime() + 1)
                         + "')");
         checkSnapshots(snapshotManager, 6, 6);
     }
@@ -157,7 +157,7 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
                 .run();
         checkSnapshots(snapshotManager, 4, 6);
 
-        // older_than => timestamp of snapshot 6, expected snapshots (6)
+        // older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
         createAction(
                         ExpireSnapshotsAction.class,
                         "expire_snapshots",
@@ -168,7 +168,7 @@ public class ExpireSnapshotsProcedureITCase extends 
CatalogITCaseBase {
                         "--table",
                         "word_count",
                         "--older_than",
-                        ts6.toString(),
+                        new Timestamp(ts6.getTime() + 1).toString(),
                         "--force_start_flink_job",
                         Boolean.toString(forceStartFlinkJob))
                 .withStreamExecutionEnvironment(env)

Reply via email to