This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2504732549 [core] Fix race condition for earliest snapshot (#4930)
2504732549 is described below

commit 2504732549660e7ed7948f88aa255bc4f7710ebb
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Feb 12 22:26:45 2025 +0800

    [core] Fix race condition for earliest snapshot (#4930)
---
 .../src/main/java/org/apache/paimon/Changelog.java |  11 ++
 .../org/apache/paimon/utils/SnapshotManager.java   | 103 +++++++++++----
 .../apache/paimon/utils/SnapshotManagerTest.java   | 145 +++++++++++++++++----
 3 files changed, 212 insertions(+), 47 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java 
b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index 8c6295b44c..79c65ba570 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -28,6 +28,7 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonPro
 
 import javax.annotation.Nullable;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Map;
 
@@ -105,9 +106,19 @@ public class Changelog extends Snapshot {
     }
 
     public static Changelog fromPath(FileIO fileIO, Path path) {
+        try {
+            return tryFromPath(fileIO, path);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException("Fails to read changelog from path " + 
path, e);
+        }
+    }
+
+    public static Changelog tryFromPath(FileIO fileIO, Path path) throws 
FileNotFoundException {
         try {
             String json = fileIO.readFileUtf8(path);
             return Changelog.fromJson(json);
+        } catch (FileNotFoundException e) {
+            throw e;
         } catch (IOException e) {
             throw new RuntimeException("Fails to read changelog from path " + 
path, e);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5257cf1c12..8eee46d9a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -168,6 +168,11 @@ public class SnapshotManager implements Serializable {
         return Changelog.fromPath(fileIO, changelogPath);
     }
 
+    private Changelog tryGetChangelog(long snapshotId) throws 
FileNotFoundException {
+        Path changelogPath = longLivedChangelogPath(snapshotId);
+        return Changelog.tryFromPath(fileIO, changelogPath);
+    }
+
     public Changelog longLivedChangelog(long snapshotId) {
         return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
     }
@@ -216,8 +221,41 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot earliestSnapshot() {
-        Long snapshotId = earliestSnapshotId();
-        return snapshotId == null ? null : snapshot(snapshotId);
+        return earliestSnapshot(false);
+    }
+
+    private @Nullable Snapshot earliestSnapshot(boolean includeChangelog) {
+        Long snapshotId = null;
+        if (includeChangelog) {
+            snapshotId = earliestLongLivedChangelogId();
+        }
+        if (snapshotId == null) {
+            snapshotId = earliestSnapshotId();
+        }
+        if (snapshotId == null) {
+            return null;
+        }
+
+        FunctionWithException<Long, Snapshot, FileNotFoundException> 
snapshotFunction =
+                includeChangelog ? this::tryGetChangelogOrSnapshot : 
this::tryGetSnapshot;
+
+        // The loss of the earliest snapshot is an event of small probability, 
so the retry number
+        // here need not be too large.
+        int retry = 0;
+        do {
+            try {
+                return snapshotFunction.apply(snapshotId);
+            } catch (FileNotFoundException e) {
+                if (retry++ >= 3) {
+                    throw new RuntimeException(e);
+                }
+                LOG.warn(
+                        "The earliest snapshot or changelog was once 
identified but disappeared. "
+                                + "It might have been expired by other jobs 
operating on this table. "
+                                + "Searching for the second earliest snapshot 
or changelog instead. ");
+                snapshotId++;
+            }
+        } while (true);
     }
 
     public @Nullable Long earliestSnapshotId() {
@@ -276,28 +314,34 @@ public class SnapshotManager implements Serializable {
         }
     }
 
+    private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws 
FileNotFoundException {
+        if (longLivedChangelogExists(snapshotId)) {
+            return tryGetChangelog(snapshotId);
+        } else {
+            return tryGetSnapshot(snapshotId);
+        }
+    }
+
     /**
      * Returns the latest snapshot earlier than the timestamp mills. A 
non-existent snapshot may be
      * returned if all snapshots are equal to or later than the timestamp 
mills.
      */
     public @Nullable Long earlierThanTimeMills(long timestampMills, boolean 
startFromChangelog) {
-        Long earliestSnapshot = earliestSnapshotId();
-        Long earliest;
-        if (startFromChangelog) {
-            Long earliestChangelog = earliestLongLivedChangelogId();
-            earliest = earliestChangelog == null ? earliestSnapshot : 
earliestChangelog;
-        } else {
-            earliest = earliestSnapshot;
-        }
         Long latest = latestSnapshotId();
-        if (earliest == null || latest == null) {
+        if (latest == null) {
+            return null;
+        }
+
+        Snapshot earliestSnapshot = earliestSnapshot(startFromChangelog);
+        if (earliestSnapshot == null) {
             return null;
         }
 
-        if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
-            return earliest - 1;
+        if (earliestSnapshot.timeMillis() >= timestampMills) {
+            return earliestSnapshot.id() - 1;
         }
 
+        long earliest = earliestSnapshot.id();
         while (earliest < latest) {
             long mid = (earliest + latest + 1) / 2;
             if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {
@@ -314,16 +358,17 @@ public class SnapshotManager implements Serializable {
      * mills. If there is no such a snapshot, returns null.
      */
     public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
-        Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
-        if (earliest == null || latest == null) {
+        if (latest == null) {
             return null;
         }
 
-        Snapshot earliestSnapShot = snapshot(earliest);
-        if (earliestSnapShot.timeMillis() > timestampMills) {
+        Snapshot earliestSnapShot = earliestSnapshot();
+        if (earliestSnapShot == null || earliestSnapShot.timeMillis() > 
timestampMills) {
             return earliestSnapShot;
         }
+        long earliest = earliestSnapShot.id();
+
         Snapshot finalSnapshot = null;
         while (earliest <= latest) {
             long mid = earliest + (latest - earliest) / 2; // Avoid overflow
@@ -376,16 +421,22 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot earlierOrEqualWatermark(long watermark) {
-        Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
         // If latest == Long.MIN_VALUE don't need next binary search for 
watermark
         // which can reduce IO cost with snapshot
-        if (earliest == null || latest == null || snapshot(latest).watermark() 
== Long.MIN_VALUE) {
+        if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
             return null;
         }
+
+        Snapshot earliestSnapShot = earliestSnapshot();
+        if (earliestSnapShot == null) {
+            return null;
+        }
+        long earliest = earliestSnapShot.id();
+
         Long earliestWatermark = null;
         // find the first snapshot with watermark
-        if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+        if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
             while (earliest < latest) {
                 earliest++;
                 earliestWatermark = snapshot(earliest).watermark();
@@ -435,16 +486,22 @@ public class SnapshotManager implements Serializable {
     }
 
     public @Nullable Snapshot laterOrEqualWatermark(long watermark) {
-        Long earliest = earliestSnapshotId();
         Long latest = latestSnapshotId();
         // If latest == Long.MIN_VALUE don't need next binary search for 
watermark
         // which can reduce IO cost with snapshot
-        if (earliest == null || latest == null || snapshot(latest).watermark() 
== Long.MIN_VALUE) {
+        if (latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
             return null;
         }
+
+        Snapshot earliestSnapShot = earliestSnapshot();
+        if (earliestSnapShot == null) {
+            return null;
+        }
+        long earliest = earliestSnapShot.id();
+
         Long earliestWatermark = null;
         // find the first snapshot with watermark
-        if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
+        if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
             while (earliest < latest) {
                 earliest++;
                 earliestWatermark = snapshot(earliest).watermark();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index e828a0c90a..20b4557043 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -27,6 +27,10 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -56,8 +60,42 @@ public class SnapshotManagerTest {
         }
     }
 
-    @Test
-    public void testEarlierThanTimeMillis() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEarliestSnapshot(boolean isRaceCondition) throws 
IOException {
+        long millis = 1684726826L;
+        FileIO localFileIO = LocalFileIO.create();
+        SnapshotManager snapshotManager =
+                new TestSnapshotManager(localFileIO, new 
Path(tempDir.toString()), isRaceCondition);
+        // create 10 snapshots
+        for (long i = 0; i < 10; i++) {
+            Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
+            localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
+        }
+
+        
assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ? 
1 : 0);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws 
IOException {
+        long millis = 1684726826L;
+        FileIO localFileIO = LocalFileIO.create();
+        SnapshotManager snapshotManager =
+                new TestSnapshotManager(localFileIO, new 
Path(tempDir.toString()), isRaceCondition);
+        // create 10 snapshots
+        for (long i = 0; i < 10; i++) {
+            Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000, 
millis + i * 1000);
+            localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
+        }
+
+        assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999).id())
+                .isEqualTo(isRaceCondition ? 1 : 0);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEarlierThanTimeMillis(boolean isRaceCondition) throws 
IOException {
         long base = System.currentTimeMillis();
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
@@ -70,7 +108,7 @@ public class SnapshotManagerTest {
 
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                new TestSnapshotManager(localFileIO, new 
Path(tempDir.toString()), isRaceCondition);
         int firstSnapshotId = random.nextInt(1, 100);
         for (int i = 0; i < numSnapshots; i++) {
             Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, 
millis.get(i));
@@ -90,11 +128,24 @@ public class SnapshotManagerTest {
             Long actual = snapshotManager.earlierThanTimeMills(time, false);
 
             if (millis.get(numSnapshots - 1) < time) {
-                assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 
1);
+                if (isRaceCondition && millis.size() == 1) {
+                    assertThat(actual).isNull();
+                } else {
+                    assertThat(actual).isEqualTo(firstSnapshotId + 
numSnapshots - 1);
+                }
             } else {
                 for (int i = 0; i < numSnapshots; i++) {
                     if (millis.get(i) >= time) {
-                        assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
+                        if (isRaceCondition && i == 0) {
+                            // The first snapshot expired during invocation
+                            if (millis.size() == 1) {
+                                assertThat(actual).isNull();
+                            } else {
+                                
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId);
+                            }
+                        } else {
+                            
assertThat(actual).isLessThanOrEqualTo(firstSnapshotId + i - 1);
+                        }
                         break;
                     }
                 }
@@ -102,31 +153,45 @@ public class SnapshotManagerTest {
         }
     }
 
-    @Test
-    public void testEarlierOrEqualTimeMills() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws 
IOException {
         long millis = 1684726826L;
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                new TestSnapshotManager(localFileIO, new 
Path(tempDir.toString()), isRaceCondition);
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
             Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
             localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
         }
 
-        // there is no snapshot smaller than "millis - 1L" return the earliest 
snapshot
-        assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L).timeMillis())
-                .isEqualTo(millis);
-
-        // smaller than the second snapshot return the first snapshot
-        assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999).timeMillis())
-                .isEqualTo(millis);
-        // equal to the second snapshot return the second snapshot
-        assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1000).timeMillis())
-                .isEqualTo(millis + 1000);
-        // larger than the second snapshot return the second snapshot
-        assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1001).timeMillis())
-                .isEqualTo(millis + 1000);
+        if (isRaceCondition) {
+            // The earliest snapshot has expired, so always return the second 
snapshot
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L).timeMillis())
+                    .isEqualTo(millis + 1000L);
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999).timeMillis())
+                    .isEqualTo(millis + 1000L);
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1000).timeMillis())
+                    .isEqualTo(millis + 1000L);
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1001).timeMillis())
+                    .isEqualTo(millis + 1000L);
+        } else {
+            // there is no snapshot smaller than "millis - 1L" return the 
earliest snapshot
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 
1L).timeMillis())
+                    .isEqualTo(millis);
+
+            // smaller than the second snapshot return the first snapshot
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
999).timeMillis())
+                    .isEqualTo(millis);
+
+            // equal to the second snapshot return the second snapshot
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1000).timeMillis())
+                    .isEqualTo(millis + 1000);
+            // larger than the second snapshot return the second snapshot
+            assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 
1001).timeMillis())
+                    .isEqualTo(millis + 1000);
+        }
     }
 
     @Test
@@ -154,12 +219,13 @@ public class SnapshotManagerTest {
         assertThat(snapshotManager.laterOrEqualTimeMills(millis + 
10001)).isNull();
     }
 
-    @Test
-    public void testlaterOrEqualWatermark() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLaterOrEqualWatermark(boolean isRaceCondition) throws 
IOException {
         long millis = Long.MIN_VALUE;
         FileIO localFileIO = LocalFileIO.create();
         SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+                new TestSnapshotManager(localFileIO, new 
Path(tempDir.toString()), isRaceCondition);
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
             Snapshot snapshot = createSnapshotWithMillis(i, millis, 
Long.MIN_VALUE);
@@ -410,4 +476,35 @@ public class SnapshotManagerTest {
         snapshotManager.commitChangelog(changelog, id);
         assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, 
id));
     }
+
+    /**
+     * Test {@link SnapshotManager} to mock situations when there is a race 
condition, that the
+     * earliest snapshot is deleted by another thread in the middle of the 
current thread's
+     * invocation.
+     */
+    private static class TestSnapshotManager extends SnapshotManager {
+        private final boolean isRaceCondition;
+
+        private boolean deleteEarliestSnapshot = false;
+
+        public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean 
isRaceCondition) {
+            super(fileIO, tablePath);
+            this.isRaceCondition = isRaceCondition;
+        }
+
+        @Override
+        public @Nullable Long earliestSnapshotId() {
+            Long snapshotId = super.earliestSnapshotId();
+            if (isRaceCondition && snapshotId != null && 
!deleteEarliestSnapshot) {
+                Path snapshotPath = snapshotPath(snapshotId);
+                try {
+                    fileIO().delete(snapshotPath, true);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                deleteEarliestSnapshot = true;
+            }
+            return snapshotId;
+        }
+    }
 }

Reply via email to