This is an automated email from the ASF dual-hosted git repository.
tsreaper pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 81dd5132ff [core] Check next snapshot's time for snapshotTimeRetain
expiration protection (#7706)
81dd5132ff is described below
commit 81dd5132ffe15f5af4a15c958f13fb9f20ee8f5e
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 8 13:51:52 2026 +0800
[core] Check next snapshot's time for snapshotTimeRetain expiration
protection (#7706)
---
.../apache/paimon/table/ExpireSnapshotsImpl.java | 20 +++--
.../paimon/operation/ExpireSnapshotsTest.java | 91 +++++++++++++++++++---
.../procedure/ExpireSnapshotsProcedureITCase.java | 8 +-
3 files changed, 99 insertions(+), 20 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index beeb6ae2b5..ef3796f20d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.paimon.utils.SnapshotManager.findPreviousOrEqualSnapshot;
@@ -63,6 +64,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
private final TagManager tagManager;
private ExpireConfig expireConfig;
+ private Supplier<Long> currentTimeMillis = System::currentTimeMillis;
public ExpireSnapshotsImpl(
SnapshotManager snapshotManager,
@@ -82,6 +84,11 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
this.fileExecutor = snapshotDeletion.fileExecutor();
}
+ @VisibleForTesting
+ public void setCurrentTimeMillis(Supplier<Long> currentTimeMillis) {
+ this.currentTimeMillis = currentTimeMillis;
+ }
+
@Override
public ExpireSnapshots config(ExpireConfig expireConfig) {
this.expireConfig = expireConfig;
@@ -95,7 +102,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
int retainMin = expireConfig.getSnapshotRetainMin();
int maxDeletes = expireConfig.getSnapshotMaxDeletes();
long olderThanMills =
- System.currentTimeMillis() -
expireConfig.getSnapshotTimeRetain().toMillis();
+ currentTimeMillis.get() -
expireConfig.getSnapshotTimeRetain().toMillis();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (latestSnapshotId == null) {
@@ -136,10 +143,11 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
- // (the maximum time of snapshots to retain)
+ // A snapshot can only be expired if its next snapshot has been
alive
+ // longer than snapshotTimeRetain, providing stronger protection
try {
- Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
- if (olderThanMills <= snapshot.timeMillis()) {
+ Snapshot nextSnapshot = snapshotManager.tryGetSnapshot(id + 1);
+ if (olderThanMills <= nextSnapshot.timeMillis()) {
return expireUntil(earliest, id);
}
} catch (FileNotFoundException e) {
@@ -162,7 +170,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
private int innerExpireUntil(long earliestId, long endExclusiveId)
throws ExecutionException, InterruptedException {
- long startTime = System.currentTimeMillis();
+ long startTime = currentTimeMillis.get();
if (endExclusiveId <= earliestId) {
// No expire happens:
@@ -268,7 +276,7 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
}
writeEarliestHint(endExclusiveId);
- long duration = System.currentTimeMillis() - startTime;
+ long duration = currentTimeMillis.get() - startTime;
LOG.info(
"Finished expire snapshots, duration {} ms, range is [{}, {})",
duration,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 01e0473e88..0b2b0031c8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -44,10 +44,13 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
@@ -524,31 +527,90 @@ public class ExpireSnapshotsTest {
builder.snapshotRetainMin(1)
.snapshotRetainMax(Integer.MAX_VALUE)
.snapshotTimeRetain(Duration.ofMillis(1000));
- ExpireSnapshots expire = store.newExpire(builder.build());
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl)
store.newExpire(builder.build());
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(5, allData, snapshotPositions);
- Thread.sleep(1500);
+ for (int i = 1; i <= 5; i++) {
+ rewriteSnapshotTime(i, 0);
+ }
commit(5, allData, snapshotPositions);
- long expireMillis = System.currentTimeMillis();
- // expire twice to check for idempotence
+ for (int i = 6; i <= 10; i++) {
+ rewriteSnapshotTime(i, 2000);
+ }
+ // expire at time 2500, olderThanMills = 1500
+ expire.setCurrentTimeMillis(() -> 2500L);
+ // expire twice to check for idempotence
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();
expire.config(builder.snapshotTimeRetain(Duration.ofMillis(1000)).build()).expire();
int latestSnapshotId =
requireNonNull(snapshotManager.latestSnapshotId()).intValue();
- for (int i = 1; i <= latestSnapshotId; i++) {
- if (snapshotManager.snapshotExists(i)) {
- assertThat(snapshotManager.snapshot(i).timeMillis())
- .isBetween(expireMillis - 1000, expireMillis);
- assertSnapshot(i, allData, snapshotPositions);
- }
+ // snapshots 1-4 should be expired, snapshot 5 is retained because its
next
+ // snapshot (6) is within the time window
+ for (int i = 1; i <= 4; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isFalse();
+ }
+ for (int i = 5; i <= latestSnapshotId; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
+ assertSnapshot(i, allData, snapshotPositions);
}
store.assertCleaned();
}
+ @Test
+ public void testExpireWithTimeProtectsEachSnapshot() throws Exception {
+ // Even with a small retainMin, each snapshot should be protected by
+ // snapshotTimeRetain: a snapshot can only be expired when its next
+ // snapshot has been alive longer than snapshotTimeRetain.
+ ExpireConfig.Builder builder = ExpireConfig.builder();
+ builder.snapshotRetainMin(1)
+ .snapshotRetainMax(Integer.MAX_VALUE)
+ .snapshotTimeRetain(Duration.ofMillis(5000));
+ ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl)
store.newExpire(builder.build());
+
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+
+ // create 5 snapshots quickly
+ commit(5, allData, snapshotPositions);
+ for (int i = 1; i <= 5; i++) {
+ rewriteSnapshotTime(i, 0);
+ }
+
+ // expire immediately - no snapshot should be expired because each
+ // snapshot's next snapshot is still within the time window
+ expire.setCurrentTimeMillis(() -> 100L);
+ expire.config(builder.build()).expire();
+
+ for (int i = 1; i <= 5; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isTrue();
+ assertSnapshot(i, allData, snapshotPositions);
+ }
+
+ // create one more snapshot so snapshot 5 has a "next"
+ commit(1, allData, snapshotPositions);
+ rewriteSnapshotTime(6, 6000);
+
+ // expire again - now snapshots 1-4 can be expired (their next
snapshots
+ // are older than 5000ms), but snapshot 5 is still protected because
its
+ // next snapshot (6) was just created
+ expire.setCurrentTimeMillis(() -> 6500L);
+ expire.config(builder.build()).expire();
+
+ for (int i = 1; i <= 4; i++) {
+ assertThat(snapshotManager.snapshotExists(i)).isFalse();
+ }
+ assertThat(snapshotManager.snapshotExists(5)).isTrue();
+ assertThat(snapshotManager.snapshotExists(6)).isTrue();
+ assertSnapshot(5, allData, snapshotPositions);
+ assertSnapshot(6, allData, snapshotPositions);
+
+ store.assertCleaned();
+ }
+
@Test
public void testExpireWithUpgradedFile() throws Exception {
// write & commit data
@@ -740,6 +802,15 @@ public class ExpireSnapshotsTest {
.build();
}
+ private void rewriteSnapshotTime(long snapshotId, long newTimeMillis)
throws IOException {
+ String oldJson =
fileIO.readFileUtf8(snapshotManager.snapshotPath(snapshotId));
+ ObjectNode node = (ObjectNode)
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(oldJson);
+ node.put("timeMillis", newTimeMillis);
+ String newJson =
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.writeValueAsString(node);
+ fileIO.overwriteFileUtf8(snapshotManager.snapshotPath(snapshotId),
newJson);
+ snapshotManager.invalidateCache();
+ }
+
protected void commit(int numCommits, List<KeyValue> allData,
List<Integer> snapshotPositions)
throws Exception {
for (int i = 0; i < numCommits; i++) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
index bfec37fb0c..0c590510ab 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
@@ -72,10 +72,10 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
+ "', retain_min => 3)");
checkSnapshots(snapshotManager, 4, 6);
- // older_than => timestamp of snapshot 6, expected snapshots (6)
+ // older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
sql(
"CALL sys.expire_snapshots(`table` => 'default.word_count',
older_than => '"
- + ts6.toString()
+ + new Timestamp(ts6.getTime() + 1)
+ "')");
checkSnapshots(snapshotManager, 6, 6);
}
@@ -157,7 +157,7 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
.run();
checkSnapshots(snapshotManager, 4, 6);
- // older_than => timestamp of snapshot 6, expected snapshots (6)
+ // older_than => timestamp + 1 of snapshot 6, expected snapshots (6)
createAction(
ExpireSnapshotsAction.class,
"expire_snapshots",
@@ -168,7 +168,7 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
"--table",
"word_count",
"--older_than",
- ts6.toString(),
+ new Timestamp(ts6.getTime() + 1).toString(),
"--force_start_flink_job",
Boolean.toString(forceStartFlinkJob))
.withStreamExecutionEnvironment(env)