This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 18517071a7 [hotfix] Fix earliestSnapshot stability (#5096)
18517071a7 is described below

commit 18517071a777693f3fd9379ad456a6ad804afdd4
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Feb 17 13:28:59 2025 +0800

    [hotfix] Fix earliestSnapshot stability (#5096)
---
 .../org/apache/paimon/utils/SnapshotManager.java   | 29 ++++++++++++----------
 .../apache/paimon/utils/SnapshotManagerTest.java   |  8 ++++--
 2 files changed, 22 insertions(+), 15 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 8eee46d9a8..8b7c91d1f8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -71,6 +71,7 @@ public class SnapshotManager implements Serializable {
     private static final String CHANGELOG_PREFIX = "changelog-";
     public static final String EARLIEST = "EARLIEST";
     public static final String LATEST = "LATEST";
+    private static final int EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM = 3;
     private static final int READ_HINT_RETRY_NUM = 3;
     private static final int READ_HINT_RETRY_INTERVAL = 1;
 
@@ -221,10 +222,11 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot earliestSnapshot() {
-        return earliestSnapshot(false);
+        return earliestSnapshot(false, null);
     }
 
-    private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
+    private @Nullable Snapshot earliestSnapshot(
+            boolean includeChangelog, @Nullable Long stopSnapshotId) {
         Long snapshotId = null;
         if (includeChangelog) {
             snapshotId = earliestLongLivedChangelogId();
@@ -236,24 +238,25 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
+        if (stopSnapshotId == null) {
+            stopSnapshotId = snapshotId + EARLIEST_SNAPSHOT_DEFAULT_RETRY_NUM;
+        }
+
         FunctionWithException<Long, Snapshot, FileNotFoundException> 
snapshotFunction =
                 includeChangelog ? this::tryGetChangelogOrSnapshot : 
this::tryGetSnapshot;
 
-        // The loss of the earliest snapshot is an event of small probability, 
so the retry number
-        // here need not be too large.
-        int retry = 0;
         do {
             try {
                 return snapshotFunction.apply(snapshotId);
             } catch (FileNotFoundException e) {
-                if (retry++ >= 3) {
-                    throw new RuntimeException(e);
+                snapshotId++;
+                if (snapshotId > stopSnapshotId) {
+                    return null;
                 }
                 LOG.warn(
                         "The earliest snapshot or changelog was once 
identified but disappeared. "
                                 + "It might have been expired by other jobs 
operating on this table. "
                                 + "Searching for the second earliest snapshot 
or changelog instead. ");
-                snapshotId++;
             }
         } while (true);
     }
@@ -332,9 +335,9 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
-        Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
+        Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog, 
latest);
         if (earliestSnapshot == null) {
-            return null;
+            return latest - 1;
         }
 
         if (earliestSnapshot.timeMillis() >= timestampMills) {
@@ -363,7 +366,7 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
-        Snapshot earliestSnapShot = earliestSnapshot();
+        Snapshot earliestSnapShot = earliestSnapshot(false, latest);
         if (earliestSnapShot == null || earliestSnapShot.timeMillis() > 
timestampMills) {
             return earliestSnapShot;
         }
@@ -428,7 +431,7 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
-        Snapshot earliestSnapShot = earliestSnapshot();
+        Snapshot earliestSnapShot = earliestSnapshot(false, latest);
         if (earliestSnapShot == null) {
             return null;
         }
@@ -493,7 +496,7 @@ public class SnapshotManager implements Serializable {
             return null;
         }
 
-        Snapshot earliestSnapShot = earliestSnapshot();
+        Snapshot earliestSnapShot = earliestSnapshot(false, latest);
         if (earliestSnapShot == null) {
             return null;
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 20b4557043..61f6135b16 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -129,7 +129,11 @@ public class SnapshotManagerTest {
 
             if (millis.get(numSnapshots - 1) < time) {
                 if (isRaceCondition && millis.size() == 1) {
-                    assertThat(actual).isNull();
+                    if (tries == 0) {
+                        
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
+                    } else {
+                        assertThat(actual).isNull();
+                    }
                 } else {
                     assertThat(actual).isEqualTo(firstSnapshotId + 
numSnapshots - 1);
                 }
@@ -138,7 +142,7 @@ public class SnapshotManagerTest {
                     if (millis.get(i) >= time) {
                         if (isRaceCondition && i == 0) {
                             // The first snapshot expired during invocation
-                            if (millis.size() == 1) {
+                            if (millis.size() == 1 && tries > 0) {
                                 assertThat(actual).isNull();
                             } else {
                                 
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);

Reply via email to