This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 18517071a7 [hotfix] Fix earliestSnapshot stability (#5096)
18517071a7 is described below
commit 18517071a777693f3fd9379ad456a6ad804afdd4
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Feb 17 13:28:59 2025 +0800
[hotfix] Fix earliestSnapshot stability (#5096)
---
.../org/apache/paimon/utils/SnapshotManager.java | 29 ++++++++++++----------
.../apache/paimon/utils/SnapshotManagerTest.java | 8 ++++--
2 files changed, 22 insertions(+), 15 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 8eee46d9a8..8b7c91d1f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -71,6 +71,7 @@ public class SnapshotManager implements Serializable {
private static final String CHANGELOG_PREFIX = "changelog-";
public static final String EARLIEST = "EARLIEST";
public static final String LATEST = "LATEST";
+ private static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3;
private static final int READ_HINT_RETRY_NUM = 3;
private static final int READ_HINT_RETRY_INTERVAL = 1;
@@ -221,10 +222,11 @@ public class SnapshotManager implements Serializable {
}
public @Nullable Snapshot earliestSnapshot() {
- return earliestSnapshot(false);
+ return earliestSnapshot(false, null);
}
- private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
+ private @Nullable Snapshot earliestSnapshot(
+ boolean includeChangelog, @Nullable Long stopSnapshotId) {
Long snapshotId = null;
if (includeChangelog) {
snapshotId = earliestLongLivedChangelogId();
@@ -236,24 +238,25 @@ public class SnapshotManager implements Serializable {
return null;
}
+ if (stopSnapshotId == null) {
+ stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
+ }
+
FunctionWithException<Long, Snapshot, FileNotFoundException>
snapshotFunction =
includeChangelog ? this::tryGetChangelogOrSnapshot :
this::tryGetSnapshot;
- // The loss of the earliest snapshot is an event of small probability,
so the retry number
- // here need not be too large.
- int retry = 0;
do {
try {
return snapshotFunction.apply(snapshotId);
} catch (FileNotFoundException e) {
- if (retry++ >= 3) {
- throw new RuntimeException(e);
+ snapshotId++;
+ if (snapshotId > stopSnapshotId) {
+ return null;
}
LOG.warn(
"The earliest snapshot or changelog was once
identified but disappeared. "
+ "It might have been expired by other jobs
operating on this table. "
+ "Searching for the second earliest snapshot
or changelog instead. ");
- snapshotId++;
}
} while (true);
}
@@ -332,9 +335,9 @@ public class SnapshotManager implements Serializable {
return null;
}
- Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
+ Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog,
latest);
if (earliestSnapshot == null) {
- return null;
+ return latest - 1;
}
if (earliestSnapshot.timeMillis() >= timestampMills) {
@@ -363,7 +366,7 @@ public class SnapshotManager implements Serializable {
return null;
}
- Snapshot earliestSnapShot = earliestSnapshot();
+ Snapshot earliestSnapShot = earliestSnapshot(false, latest);
if (earliestSnapShot == null || earliestSnapShot.timeMillis() >
timestampMills) {
return earliestSnapShot;
}
@@ -428,7 +431,7 @@ public class SnapshotManager implements Serializable {
return null;
}
- Snapshot earliestSnapShot = earliestSnapshot();
+ Snapshot earliestSnapShot = earliestSnapshot(false, latest);
if (earliestSnapShot == null) {
return null;
}
@@ -493,7 +496,7 @@ public class SnapshotManager implements Serializable {
return null;
}
- Snapshot earliestSnapShot = earliestSnapshot();
+ Snapshot earliestSnapShot = earliestSnapshot(false, latest);
if (earliestSnapShot == null) {
return null;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 20b4557043..61f6135b16 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -129,7 +129,11 @@ public class SnapshotManagerTest {
if (millis.get(numSnapshots - 1) < time) {
if (isRaceCondition && millis.size() == 1) {
- assertThat(actual).isNull();
+ if (tries == 0) {
+
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
+ } else {
+ assertThat(actual).isNull();
+ }
} else {
assertThat(actual).isEqualTo(firstSnapshotId +
numSnapshots - 1);
}
@@ -138,7 +142,7 @@ public class SnapshotManagerTest {
if (millis.get(i) >= time) {
if (isRaceCondition && i == 0) {
// The first snapshot expired during invocation
- if (millis.size() == 1) {
+ if (millis.size() == 1 && tries > 0) {
assertThat(actual).isNull();
} else {
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);