This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a3cb728a [core] Avoid FileNotFoundException when looking for the 
latest snapshot of user (#3906)
0a3cb728a is described below

commit 0a3cb728a2c1f311e7a84bbb1181ca28b7242e1b
Author: rfyu <[email protected]>
AuthorDate: Sun Aug 11 23:24:58 2024 +0800

    [core] Avoid FileNotFoundException when looking for the latest snapshot of 
user (#3906)
---
 .../org/apache/paimon/utils/SnapshotManager.java   | 33 ++++++++++++++-
 .../apache/paimon/utils/SnapshotManagerTest.java   | 49 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 1b64d8bc8..c5fdb042e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -24,6 +24,9 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -56,6 +59,8 @@ public class SnapshotManager implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotManager.class);
+
     private static final String SNAPSHOT_PREFIX = "snapshot-";
     private static final String CHANGELOG_PREFIX = "changelog-";
     public static final String EARLIEST = "EARLIEST";
@@ -523,7 +528,31 @@ public class SnapshotManager implements Serializable {
                         "Latest snapshot id is not null, but earliest snapshot 
id is null. "
                                 + "This is unexpected.");
         for (long id = latestId; id >= earliestId; id--) {
-            Snapshot snapshot = snapshot(id);
+            Snapshot snapshot;
+            try {
+                snapshot = snapshot(id);
+            } catch (Exception e) {
+                long newEarliestId =
+                        Preconditions.checkNotNull(
+                                earliestSnapshotId(),
+                                "Latest snapshot id is not null, but earliest 
snapshot id is null. "
+                                        + "This is unexpected.");
+
+                // this is a valid snapshot, should throw exception
+                if (id >= newEarliestId) {
+                    throw e;
+                }
+
+                // this is an expired snapshot
+                LOG.warn(
+                        "Snapshot #"
+                                + id
+                                + " is expired. The latest snapshot of current 
user("
+                                + user
+                                + ") is not found.");
+                break;
+            }
+
             if (user.equals(snapshot.commitUser())) {
                 return Optional.of(snapshot);
             }
@@ -594,7 +623,7 @@ public class SnapshotManager implements Serializable {
                     return null;
                 }
 
-                // this is a valid snapshot, should not throw exception
+                // this is a valid snapshot, should throw exception
                 if (id >= newEarliestId) {
                     throw e;
                 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index b1e038480..4e0b18d47 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -199,6 +199,55 @@ public class SnapshotManagerTest {
                         null));
     }
 
+    @Test
+    public void testLatestSnapshotOfUser() throws IOException, 
InterruptedException {
+        FileIO localFileIO = LocalFileIO.create();
+        SnapshotManager snapshotManager =
+                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+        // create 100 snapshots using user "lastCommitUser"
+        for (long i = 0; i < 100; i++) {
+            Snapshot snapshot =
+                    new Snapshot(
+                            i,
+                            0L,
+                            null,
+                            null,
+                            null,
+                            null,
+                            "lastCommitUser",
+                            0L,
+                            Snapshot.CommitKind.APPEND,
+                            i * 1000,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null,
+                            null);
+            localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), 
snapshot.toJson());
+        }
+
+        // read the latest snapshot of user "currentCommitUser"
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        Thread thread =
+                new Thread(
+                        () -> {
+                            try {
+                                
snapshotManager.latestSnapshotOfUser("currentCommitUser");
+                            } catch (Exception e) {
+                                exception.set(e);
+                            }
+                        });
+        thread.start();
+        Thread.sleep(100);
+
+        // expire snapshot
+        localFileIO.deleteQuietly(snapshotManager.snapshotPath(0));
+        thread.join();
+
+        assertThat(exception.get()).isNull();
+    }
+
     @Test
     public void testTraversalSnapshotsFromLatestSafely() throws IOException, 
InterruptedException {
         FileIO localFileIO = LocalFileIO.create();

Reply via email to